Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * vacuumparallel.c
4 : : * Support routines for parallel vacuum execution.
5 : : *
6 : : * This file contains routines that are intended to support setting up, using,
7 : : * and tearing down a ParallelVacuumState.
8 : : *
9 : : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 : : * with parallel worker processes. Individual indexes are processed by one
11 : : * vacuum process. ParalleVacuumState contains shared information as well as
12 : : * the memory space for storing dead items allocated in the DSA area. We
13 : : * launch parallel worker processes at the start of parallel index
14 : : * bulk-deletion and index cleanup and once all indexes are processed, the
15 : : * parallel worker processes exit. Each time we process indexes in parallel,
16 : : * the parallel context is re-initialized so that the same DSM can be used for
17 : : * multiple passes of index bulk-deletion and index cleanup.
18 : : *
19 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
20 : : * Portions Copyright (c) 1994, Regents of the University of California
21 : : *
22 : : * IDENTIFICATION
23 : : * src/backend/commands/vacuumparallel.c
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : : #include "postgres.h"
28 : :
29 : : #include "access/amapi.h"
30 : : #include "access/table.h"
31 : : #include "access/xact.h"
32 : : #include "commands/progress.h"
33 : : #include "commands/vacuum.h"
34 : : #include "executor/instrument.h"
35 : : #include "optimizer/paths.h"
36 : : #include "pgstat.h"
37 : : #include "storage/bufmgr.h"
38 : : #include "tcop/tcopprot.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : :
42 : : /*
43 : : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
44 : : * we don't need to worry about DSM keys conflicting with plan_node_id we can
45 : : * use small integers.
46 : : */
47 : : #define PARALLEL_VACUUM_KEY_SHARED 1
48 : : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
49 : : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
50 : : #define PARALLEL_VACUUM_KEY_WAL_USAGE 4
51 : : #define PARALLEL_VACUUM_KEY_INDEX_STATS 5
52 : :
53 : : /*
54 : : * Shared information among parallel workers. So this is allocated in the DSM
55 : : * segment.
56 : : */
57 : : typedef struct PVShared
58 : : {
59 : : /*
60 : : * Target table relid and log level (for messages about parallel workers
61 : : * launched during VACUUM VERBOSE). These fields are not modified during
62 : : * the parallel vacuum.
63 : : */
64 : : Oid relid;
65 : : int elevel;
66 : :
67 : : /*
68 : : * Fields for both index vacuum and cleanup.
69 : : *
70 : : * reltuples is the total number of input heap tuples. We set either old
71 : : * live tuples in the index vacuum case or the new live tuples in the
72 : : * index cleanup case.
73 : : *
74 : : * estimated_count is true if reltuples is an estimated value. (Note that
75 : : * reltuples could be -1 in this case, indicating we have no idea.)
76 : : */
77 : : double reltuples;
78 : : bool estimated_count;
79 : :
80 : : /*
81 : : * In single process vacuum we could consume more memory during index
82 : : * vacuuming or cleanup apart from the memory for heap scanning. In
83 : : * parallel vacuum, since individual vacuum workers can consume memory
84 : : * equal to maintenance_work_mem, the new maintenance_work_mem for each
85 : : * worker is set such that the parallel operation doesn't consume more
86 : : * memory than single process vacuum.
87 : : */
88 : : int maintenance_work_mem_worker;
89 : :
90 : : /*
91 : : * The number of buffers each worker's Buffer Access Strategy ring should
92 : : * contain.
93 : : */
94 : : int ring_nbuffers;
95 : :
96 : : /*
97 : : * Shared vacuum cost balance. During parallel vacuum,
98 : : * VacuumSharedCostBalance points to this value and it accumulates the
99 : : * balance of each parallel vacuum worker.
100 : : */
101 : : pg_atomic_uint32 cost_balance;
102 : :
103 : : /*
104 : : * Number of active parallel workers. This is used for computing the
105 : : * minimum threshold of the vacuum cost balance before a worker sleeps for
106 : : * cost-based delay.
107 : : */
108 : : pg_atomic_uint32 active_nworkers;
109 : :
110 : : /* Counter for vacuuming and cleanup */
111 : : pg_atomic_uint32 idx;
112 : :
113 : : /* DSA handle where the TidStore lives */
114 : : dsa_handle dead_items_dsa_handle;
115 : :
116 : : /* DSA pointer to the shared TidStore */
117 : : dsa_pointer dead_items_handle;
118 : :
119 : : /* Statistics of shared dead items */
120 : : VacDeadItemsInfo dead_items_info;
121 : : } PVShared;
122 : :
123 : : /* Status used during parallel index vacuum or cleanup */
124 : : typedef enum PVIndVacStatus
125 : : {
126 : : PARALLEL_INDVAC_STATUS_INITIAL = 0,
127 : : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
128 : : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
129 : : PARALLEL_INDVAC_STATUS_COMPLETED,
130 : : } PVIndVacStatus;
131 : :
132 : : /*
133 : : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
134 : : * This includes the status of parallel index vacuum as well as index statistics.
135 : : */
136 : : typedef struct PVIndStats
137 : : {
138 : : /*
139 : : * The following two fields are set by leader process before executing
140 : : * parallel index vacuum or parallel index cleanup. These fields are not
141 : : * fixed for the entire VACUUM operation. They are only fixed for an
142 : : * individual parallel index vacuum and cleanup.
143 : : *
144 : : * parallel_workers_can_process is true if both leader and worker can
145 : : * process the index, otherwise only leader can process it.
146 : : */
147 : : PVIndVacStatus status;
148 : : bool parallel_workers_can_process;
149 : :
150 : : /*
151 : : * Individual worker or leader stores the result of index vacuum or
152 : : * cleanup.
153 : : */
154 : : bool istat_updated; /* are the stats updated? */
155 : : IndexBulkDeleteResult istat;
156 : : } PVIndStats;
157 : :
158 : : /*
159 : : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
160 : : */
161 : : struct ParallelVacuumState
162 : : {
163 : : /* NULL for worker processes */
164 : : ParallelContext *pcxt;
165 : :
166 : : /* Parent Heap Relation */
167 : : Relation heaprel;
168 : :
169 : : /* Target indexes */
170 : : Relation *indrels;
171 : : int nindexes;
172 : :
173 : : /* Shared information among parallel vacuum workers */
174 : : PVShared *shared;
175 : :
176 : : /*
177 : : * Shared index statistics among parallel vacuum workers. The array
178 : : * element is allocated for every index, even those indexes where parallel
179 : : * index vacuuming is unsafe or not worthwhile (e.g.,
180 : : * will_parallel_vacuum[] is false). During parallel vacuum,
181 : : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
182 : : * local memory at the end of parallel vacuum.
183 : : */
184 : : PVIndStats *indstats;
185 : :
186 : : /* Shared dead items space among parallel vacuum workers */
187 : : TidStore *dead_items;
188 : :
189 : : /* Points to buffer usage area in DSM */
190 : : BufferUsage *buffer_usage;
191 : :
192 : : /* Points to WAL usage area in DSM */
193 : : WalUsage *wal_usage;
194 : :
195 : : /*
196 : : * False if the index is totally unsuitable target for all parallel
197 : : * processing. For example, the index could be <
198 : : * min_parallel_index_scan_size cutoff.
199 : : */
200 : : bool *will_parallel_vacuum;
201 : :
202 : : /*
203 : : * The number of indexes that support parallel index bulk-deletion and
204 : : * parallel index cleanup respectively.
205 : : */
206 : : int nindexes_parallel_bulkdel;
207 : : int nindexes_parallel_cleanup;
208 : : int nindexes_parallel_condcleanup;
209 : :
210 : : /* Buffer access strategy used by leader process */
211 : : BufferAccessStrategy bstrategy;
212 : :
213 : : /*
214 : : * Error reporting state. The error callback is set only for workers
215 : : * processes during parallel index vacuum.
216 : : */
217 : : char *relnamespace;
218 : : char *relname;
219 : : char *indname;
220 : : PVIndVacStatus status;
221 : : };
222 : :
223 : : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
224 : : bool *will_parallel_vacuum);
225 : : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
226 : : bool vacuum);
227 : : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
228 : : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
229 : : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
230 : : PVIndStats *indstats);
231 : : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
232 : : bool vacuum);
233 : : static void parallel_vacuum_error_callback(void *arg);
234 : :
235 : : /*
236 : : * Try to enter parallel mode and create a parallel context. Then initialize
237 : : * shared memory state.
238 : : *
239 : : * On success, return parallel vacuum state. Otherwise return NULL.
240 : : */
241 : : ParallelVacuumState *
843 akapila@postgresql.o 242 :CBC 3617 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
243 : : int nrequested_workers, int vac_work_mem,
244 : : int elevel, BufferAccessStrategy bstrategy)
245 : : {
246 : : ParallelVacuumState *pvs;
247 : : ParallelContext *pcxt;
248 : : PVShared *shared;
249 : : TidStore *dead_items;
250 : : PVIndStats *indstats;
251 : : BufferUsage *buffer_usage;
252 : : WalUsage *wal_usage;
253 : : bool *will_parallel_vacuum;
254 : : Size est_indstats_len;
255 : : Size est_shared_len;
256 : 3617 : int nindexes_mwm = 0;
257 : 3617 : int parallel_workers = 0;
258 : : int querylen;
259 : :
260 : : /*
261 : : * A parallel vacuum must be requested and there must be indexes on the
262 : : * relation
263 : : */
264 [ - + ]: 3617 : Assert(nrequested_workers >= 0);
265 [ - + ]: 3617 : Assert(nindexes > 0);
266 : :
267 : : /*
268 : : * Compute the number of parallel vacuum workers to launch
269 : : */
270 : 3617 : will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
271 : 3617 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
272 : : nrequested_workers,
273 : : will_parallel_vacuum);
274 [ + + ]: 3617 : if (parallel_workers <= 0)
275 : : {
276 : : /* Can't perform vacuum in parallel -- return NULL */
277 : 3608 : pfree(will_parallel_vacuum);
278 : 3608 : return NULL;
279 : : }
280 : :
281 : 9 : pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
282 : 9 : pvs->indrels = indrels;
283 : 9 : pvs->nindexes = nindexes;
284 : 9 : pvs->will_parallel_vacuum = will_parallel_vacuum;
285 : 9 : pvs->bstrategy = bstrategy;
379 andres@anarazel.de 286 : 9 : pvs->heaprel = rel;
287 : :
843 akapila@postgresql.o 288 : 9 : EnterParallelMode();
289 : 9 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
290 : : parallel_workers);
291 [ - + ]: 9 : Assert(pcxt->nworkers > 0);
292 : 9 : pvs->pcxt = pcxt;
293 : :
294 : : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
295 : 9 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
296 : 9 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
297 : 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
298 : :
299 : : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
300 : 9 : est_shared_len = sizeof(PVShared);
301 : 9 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
302 : 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
303 : :
304 : : /*
305 : : * Estimate space for BufferUsage and WalUsage --
306 : : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
307 : : *
308 : : * If there are no extensions loaded that care, we could skip this. We
309 : : * have no way of knowing whether anyone's looking at pgBufferUsage or
310 : : * pgWalUsage, so do it unconditionally.
311 : : */
312 : 9 : shm_toc_estimate_chunk(&pcxt->estimator,
313 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
314 : 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
315 : 9 : shm_toc_estimate_chunk(&pcxt->estimator,
316 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
317 : 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
318 : :
319 : : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
320 [ + - ]: 9 : if (debug_query_string)
321 : : {
322 : 9 : querylen = strlen(debug_query_string);
323 : 9 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
324 : 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
325 : : }
326 : : else
843 akapila@postgresql.o 327 :UBC 0 : querylen = 0; /* keep compiler quiet */
328 : :
843 akapila@postgresql.o 329 :CBC 9 : InitializeParallelDSM(pcxt);
330 : :
331 : : /* Prepare index vacuum stats */
332 : 9 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
605 pg@bowt.ie 333 [ + - + - : 279 : MemSet(indstats, 0, est_indstats_len);
+ - + - +
+ ]
843 akapila@postgresql.o 334 [ + + ]: 54 : for (int i = 0; i < nindexes; i++)
335 : : {
336 : 45 : Relation indrel = indrels[i];
337 : 45 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
338 : :
339 : : /*
340 : : * Cleanup option should be either disabled, always performing in
341 : : * parallel or conditionally performing in parallel.
342 : : */
343 [ + + - + ]: 45 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
344 : : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
345 [ - + ]: 45 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
346 : :
347 [ + + ]: 45 : if (!will_parallel_vacuum[i])
348 : 3 : continue;
349 : :
350 [ + + ]: 42 : if (indrel->rd_indam->amusemaintenanceworkmem)
351 : 6 : nindexes_mwm++;
352 : :
353 : : /*
354 : : * Remember the number of indexes that support parallel operation for
355 : : * each phase.
356 : : */
357 [ + + ]: 42 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
358 : 36 : pvs->nindexes_parallel_bulkdel++;
359 [ + + ]: 42 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
360 : 12 : pvs->nindexes_parallel_cleanup++;
361 [ + + ]: 42 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
362 : 24 : pvs->nindexes_parallel_condcleanup++;
363 : : }
364 : 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
365 : 9 : pvs->indstats = indstats;
366 : :
367 : : /* Prepare shared information */
368 : 9 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
369 [ + - + - : 90 : MemSet(shared, 0, est_shared_len);
+ - + - +
+ ]
370 : 9 : shared->relid = RelationGetRelid(rel);
371 : 9 : shared->elevel = elevel;
372 : 9 : shared->maintenance_work_mem_worker =
373 : : (nindexes_mwm > 0) ?
374 [ + + ]: 9 : maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
375 : : maintenance_work_mem;
12 msawada@postgresql.o 376 :GNC 9 : shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
377 : :
378 : : /* Prepare DSA space for dead items */
379 : 9 : dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
380 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
381 : 9 : pvs->dead_items = dead_items;
382 : 9 : shared->dead_items_handle = TidStoreGetHandle(dead_items);
383 : 9 : shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
384 : :
385 : : /* Use the same buffer size for all workers */
373 drowley@postgresql.o 386 :CBC 9 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
387 : :
843 akapila@postgresql.o 388 : 9 : pg_atomic_init_u32(&(shared->cost_balance), 0);
389 : 9 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
390 : 9 : pg_atomic_init_u32(&(shared->idx), 0);
391 : :
392 : 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
393 : 9 : pvs->shared = shared;
394 : :
395 : : /*
396 : : * Allocate space for each worker's BufferUsage and WalUsage; no need to
397 : : * initialize
398 : : */
399 : 9 : buffer_usage = shm_toc_allocate(pcxt->toc,
400 : 9 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
401 : 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
402 : 9 : pvs->buffer_usage = buffer_usage;
403 : 9 : wal_usage = shm_toc_allocate(pcxt->toc,
404 : 9 : mul_size(sizeof(WalUsage), pcxt->nworkers));
405 : 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
406 : 9 : pvs->wal_usage = wal_usage;
407 : :
408 : : /* Store query string for workers */
409 [ + - ]: 9 : if (debug_query_string)
410 : : {
411 : : char *sharedquery;
412 : :
413 : 9 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
414 : 9 : memcpy(sharedquery, debug_query_string, querylen + 1);
415 : 9 : sharedquery[querylen] = '\0';
416 : 9 : shm_toc_insert(pcxt->toc,
417 : : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
418 : : }
419 : :
420 : : /* Success -- return parallel vacuum state */
421 : 9 : return pvs;
422 : : }
423 : :
424 : : /*
425 : : * Destroy the parallel context, and end parallel mode.
426 : : *
427 : : * Since writes are not allowed during parallel mode, copy the
428 : : * updated index statistics from DSM into local memory and then later use that
429 : : * to update the index statistics. One might think that we can exit from
430 : : * parallel mode, update the index statistics and then destroy parallel
431 : : * context, but that won't be safe (see ExitParallelMode).
432 : : */
433 : : void
434 : 9 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
435 : : {
436 [ - + ]: 9 : Assert(!IsParallelWorker());
437 : :
438 : : /* Copy the updated statistics */
439 [ + + ]: 54 : for (int i = 0; i < pvs->nindexes; i++)
440 : : {
441 : 45 : PVIndStats *indstats = &(pvs->indstats[i]);
442 : :
443 [ + + ]: 45 : if (indstats->istat_updated)
444 : : {
445 : 35 : istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
446 : 35 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
447 : : }
448 : : else
449 : 10 : istats[i] = NULL;
450 : : }
451 : :
12 msawada@postgresql.o 452 :GNC 9 : TidStoreDestroy(pvs->dead_items);
453 : :
843 akapila@postgresql.o 454 :CBC 9 : DestroyParallelContext(pvs->pcxt);
455 : 9 : ExitParallelMode();
456 : :
457 : 9 : pfree(pvs->will_parallel_vacuum);
458 : 9 : pfree(pvs);
459 : 9 : }
460 : :
461 : : /*
462 : : * Returns the dead items space and dead items information.
463 : : */
464 : : TidStore *
12 msawada@postgresql.o 465 :GNC 9 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
466 : : {
467 : 9 : *dead_items_info_p = &(pvs->shared->dead_items_info);
843 akapila@postgresql.o 468 :CBC 9 : return pvs->dead_items;
469 : : }
470 : :
471 : : /* Forget all items in dead_items */
472 : : void
12 msawada@postgresql.o 473 :GNC 4 : parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
474 : : {
475 : 4 : TidStore *dead_items = pvs->dead_items;
476 : 4 : VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
477 : :
478 : : /*
479 : : * Free the current tidstore and return allocated DSA segments to the
480 : : * operating system. Then we recreate the tidstore with the same max_bytes
481 : : * limitation we just used.
482 : : */
483 : 4 : TidStoreDestroy(dead_items);
484 : 4 : pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
485 : : LWTRANCHE_PARALLEL_VACUUM_DSA);
486 : :
487 : : /* Update the DSA pointer for dead_items to the new one */
488 : 4 : pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
489 : 4 : pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
490 : :
491 : : /* Reset the counter */
492 : 4 : dead_items_info->num_items = 0;
493 : 4 : }
494 : :
495 : : /*
496 : : * Do parallel index bulk-deletion with parallel workers.
497 : : */
498 : : void
843 akapila@postgresql.o 499 :CBC 4 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
500 : : int num_index_scans)
501 : : {
502 [ - + ]: 4 : Assert(!IsParallelWorker());
503 : :
504 : : /*
505 : : * We can only provide an approximate value of num_heap_tuples, at least
506 : : * for now.
507 : : */
508 : 4 : pvs->shared->reltuples = num_table_tuples;
509 : 4 : pvs->shared->estimated_count = true;
510 : :
511 : 4 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
512 : 4 : }
513 : :
514 : : /*
515 : : * Do parallel index cleanup with parallel workers.
516 : : */
517 : : void
518 : 9 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
519 : : int num_index_scans, bool estimated_count)
520 : : {
521 [ - + ]: 9 : Assert(!IsParallelWorker());
522 : :
523 : : /*
524 : : * We can provide a better estimate of total number of surviving tuples
525 : : * (we assume indexes are more interested in that than in the number of
526 : : * nominally live tuples).
527 : : */
528 : 9 : pvs->shared->reltuples = num_table_tuples;
529 : 9 : pvs->shared->estimated_count = estimated_count;
530 : :
531 : 9 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
532 : 9 : }
533 : :
534 : : /*
535 : : * Compute the number of parallel worker processes to request. Both index
536 : : * vacuum and index cleanup can be executed with parallel workers.
537 : : * The index is eligible for parallel vacuum iff its size is greater than
538 : : * min_parallel_index_scan_size as invoking workers for very small indexes
539 : : * can hurt performance.
540 : : *
541 : : * nrequested is the number of parallel workers that user requested. If
542 : : * nrequested is 0, we compute the parallel degree based on nindexes, that is
543 : : * the number of indexes that support parallel vacuum. This function also
544 : : * sets will_parallel_vacuum to remember indexes that participate in parallel
545 : : * vacuum.
546 : : */
547 : : static int
548 : 3617 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
549 : : bool *will_parallel_vacuum)
550 : : {
551 : 3617 : int nindexes_parallel = 0;
552 : 3617 : int nindexes_parallel_bulkdel = 0;
553 : 3617 : int nindexes_parallel_cleanup = 0;
554 : : int parallel_workers;
555 : :
556 : : /*
557 : : * We don't allow performing parallel operation in standalone backend or
558 : : * when parallelism is disabled.
559 : : */
560 [ + + - + ]: 3617 : if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
561 : 1813 : return 0;
562 : :
563 : : /*
564 : : * Compute the number of indexes that can participate in parallel vacuum.
565 : : */
566 [ + + ]: 5894 : for (int i = 0; i < nindexes; i++)
567 : : {
568 : 4090 : Relation indrel = indrels[i];
569 : 4090 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
570 : :
571 : : /* Skip index that is not a suitable target for parallel index vacuum */
572 [ + - ]: 4090 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
573 [ + + ]: 4090 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
574 : 4042 : continue;
575 : :
576 : 48 : will_parallel_vacuum[i] = true;
577 : :
578 [ + + ]: 48 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
579 : 42 : nindexes_parallel_bulkdel++;
580 [ + + ]: 48 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
581 [ + + ]: 36 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
582 : 42 : nindexes_parallel_cleanup++;
583 : : }
584 : :
585 : 1804 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
586 : : nindexes_parallel_cleanup);
587 : :
588 : : /* The leader process takes one index */
589 : 1804 : nindexes_parallel--;
590 : :
591 : : /* No index supports parallel vacuum */
592 [ + + ]: 1804 : if (nindexes_parallel <= 0)
593 : 1795 : return 0;
594 : :
595 : : /* Compute the parallel degree */
596 : 9 : parallel_workers = (nrequested > 0) ?
597 [ + - ]: 9 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
598 : :
599 : : /* Cap by max_parallel_maintenance_workers */
600 : 9 : parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
601 : :
602 : 9 : return parallel_workers;
603 : : }
604 : :
605 : : /*
606 : : * Perform index vacuum or index cleanup with parallel workers. This function
607 : : * must be used by the parallel vacuum leader process.
608 : : */
609 : : static void
610 : 13 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
611 : : bool vacuum)
612 : : {
613 : : int nworkers;
614 : : PVIndVacStatus new_status;
615 : :
616 [ - + ]: 13 : Assert(!IsParallelWorker());
617 : :
618 [ + + ]: 13 : if (vacuum)
619 : : {
620 : 4 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
621 : :
622 : : /* Determine the number of parallel workers to launch */
623 : 4 : nworkers = pvs->nindexes_parallel_bulkdel;
624 : : }
625 : : else
626 : : {
627 : 9 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
628 : :
629 : : /* Determine the number of parallel workers to launch */
630 : 9 : nworkers = pvs->nindexes_parallel_cleanup;
631 : :
632 : : /* Add conditionally parallel-aware indexes if in the first time call */
633 [ + + ]: 9 : if (num_index_scans == 0)
634 : 5 : nworkers += pvs->nindexes_parallel_condcleanup;
635 : : }
636 : :
637 : : /* The leader process will participate */
638 : 13 : nworkers--;
639 : :
640 : : /*
641 : : * It is possible that parallel context is initialized with fewer workers
642 : : * than the number of indexes that need a separate worker in the current
643 : : * phase, so we need to consider it. See
644 : : * parallel_vacuum_compute_workers().
645 : : */
646 : 13 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
647 : :
648 : : /*
649 : : * Set index vacuum status and mark whether parallel vacuum worker can
650 : : * process it.
651 : : */
652 [ + + ]: 73 : for (int i = 0; i < pvs->nindexes; i++)
653 : : {
654 : 60 : PVIndStats *indstats = &(pvs->indstats[i]);
655 : :
656 [ - + ]: 60 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
657 : 60 : indstats->status = new_status;
658 : 60 : indstats->parallel_workers_can_process =
601 659 [ + + + + ]: 114 : (pvs->will_parallel_vacuum[i] &&
843 660 : 54 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
661 : : num_index_scans,
843 akapila@postgresql.o 662 :ECB (60) : vacuum));
663 : : }
664 : :
665 : : /* Reset the parallel index processing and progress counters */
843 akapila@postgresql.o 666 :CBC 13 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
667 : :
668 : : /* Setup the shared cost-based vacuum delay and launch workers */
669 [ + + ]: 13 : if (nworkers > 0)
670 : : {
671 : : /* Reinitialize parallel context to relaunch parallel workers */
672 [ + + ]: 10 : if (num_index_scans > 0)
673 : 1 : ReinitializeParallelDSM(pvs->pcxt);
674 : :
675 : : /*
676 : : * Set up shared cost balance and the number of active workers for
677 : : * vacuum delay. We need to do this before launching workers as
678 : : * otherwise, they might not see the updated values for these
679 : : * parameters.
680 : : */
681 : 10 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
682 : 10 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
683 : :
684 : : /*
685 : : * The number of workers can vary between bulkdelete and cleanup
686 : : * phase.
687 : : */
688 : 10 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
689 : :
690 : 10 : LaunchParallelWorkers(pvs->pcxt);
691 : :
692 [ + - ]: 10 : if (pvs->pcxt->nworkers_launched > 0)
693 : : {
694 : : /*
695 : : * Reset the local cost values for leader backend as we have
696 : : * already accumulated the remaining balance of heap.
697 : : */
698 : 10 : VacuumCostBalance = 0;
699 : 10 : VacuumCostBalanceLocal = 0;
700 : :
701 : : /* Enable shared cost balance for leader backend */
702 : 10 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
703 : 10 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
704 : : }
705 : :
706 [ + + ]: 10 : if (vacuum)
707 [ - + ]: 4 : ereport(pvs->shared->elevel,
708 : : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
709 : : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
710 : : pvs->pcxt->nworkers_launched),
711 : : pvs->pcxt->nworkers_launched, nworkers)));
712 : : else
713 [ - + ]: 6 : ereport(pvs->shared->elevel,
714 : : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
715 : : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
716 : : pvs->pcxt->nworkers_launched),
717 : : pvs->pcxt->nworkers_launched, nworkers)));
718 : : }
719 : :
720 : : /* Vacuum the indexes that can be processed by only leader process */
721 : 13 : parallel_vacuum_process_unsafe_indexes(pvs);
722 : :
723 : : /*
724 : : * Join as a parallel worker. The leader vacuums alone processes all
725 : : * parallel-safe indexes in the case where no workers are launched.
726 : : */
727 : 13 : parallel_vacuum_process_safe_indexes(pvs);
728 : :
729 : : /*
730 : : * Next, accumulate buffer and WAL usage. (This must wait for the workers
731 : : * to finish, or we might get incomplete data.)
732 : : */
733 [ + + ]: 13 : if (nworkers > 0)
734 : : {
735 : : /* Wait for all vacuum workers to finish */
736 : 10 : WaitForParallelWorkersToFinish(pvs->pcxt);
737 : :
738 [ + + ]: 26 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
739 : 16 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
740 : : }
741 : :
742 : : /*
743 : : * Reset all index status back to initial (while checking that we have
744 : : * vacuumed all indexes).
745 : : */
746 [ + + ]: 73 : for (int i = 0; i < pvs->nindexes; i++)
747 : : {
748 : 60 : PVIndStats *indstats = &(pvs->indstats[i]);
749 : :
750 [ - + ]: 60 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
843 akapila@postgresql.o 751 [ # # ]:UBC 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
752 : : RelationGetRelationName(pvs->indrels[i]));
753 : :
843 akapila@postgresql.o 754 :CBC 60 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
755 : : }
756 : :
757 : : /*
758 : : * Carry the shared balance value to heap scan and disable shared costing
759 : : */
760 [ + + ]: 13 : if (VacuumSharedCostBalance)
761 : : {
762 : 10 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
763 : 10 : VacuumSharedCostBalance = NULL;
764 : 10 : VacuumActiveNWorkers = NULL;
765 : : }
766 : 13 : }
767 : :
768 : : /*
769 : : * Index vacuum/cleanup routine used by the leader process and parallel
770 : : * vacuum worker processes to vacuum the indexes in parallel.
771 : : */
772 : : static void
773 : 29 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
774 : : {
775 : : /*
776 : : * Increment the active worker count if we are able to launch any worker.
777 : : */
778 [ + + ]: 29 : if (VacuumActiveNWorkers)
779 : 26 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
780 : :
781 : : /* Loop until all indexes are vacuumed */
782 : : for (;;)
783 : 60 : {
784 : : int idx;
785 : : PVIndStats *indstats;
786 : :
787 : : /* Get an index number to process */
788 : 89 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
789 : :
790 : : /* Done for all indexes? */
791 [ + + ]: 89 : if (idx >= pvs->nindexes)
792 : 29 : break;
793 : :
794 : 60 : indstats = &(pvs->indstats[idx]);
795 : :
796 : : /*
797 : : * Skip vacuuming index that is unsafe for workers or has an
798 : : * unsuitable target for parallel index vacuum (this is vacuumed in
799 : : * parallel_vacuum_process_unsafe_indexes() by the leader).
800 : : */
801 [ + + ]: 60 : if (!indstats->parallel_workers_can_process)
802 : 22 : continue;
803 : :
804 : : /* Do vacuum or cleanup of the index */
805 : 38 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
806 : : }
807 : :
808 : : /*
809 : : * We have completed the index vacuum so decrement the active worker
810 : : * count.
811 : : */
812 [ + + ]: 29 : if (VacuumActiveNWorkers)
813 : 26 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
814 : 29 : }
815 : :
816 : : /*
817 : : * Perform parallel vacuuming of indexes in leader process.
818 : : *
819 : : * Handles index vacuuming (or index cleanup) for indexes that are not
820 : : * parallel safe. It's possible that this will vary for a given index, based
821 : : * on details like whether we're performing index cleanup right now.
822 : : *
823 : : * Also performs vacuuming of smaller indexes that fell under the size cutoff
824 : : * enforced by parallel_vacuum_compute_workers().
825 : : */
826 : : static void
827 : 13 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
828 : : {
829 [ - + ]: 13 : Assert(!IsParallelWorker());
830 : :
831 : : /*
832 : : * Increment the active worker count if we are able to launch any worker.
833 : : */
834 [ + + ]: 13 : if (VacuumActiveNWorkers)
835 : 10 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
836 : :
837 [ + + ]: 73 : for (int i = 0; i < pvs->nindexes; i++)
838 : : {
839 : 60 : PVIndStats *indstats = &(pvs->indstats[i]);
840 : :
841 : : /* Skip, indexes that are safe for workers */
842 [ + + ]: 60 : if (indstats->parallel_workers_can_process)
843 : 38 : continue;
844 : :
845 : : /* Do vacuum or cleanup of the index */
846 : 22 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
847 : : }
848 : :
849 : : /*
850 : : * We have completed the index vacuum so decrement the active worker
851 : : * count.
852 : : */
853 [ + + ]: 13 : if (VacuumActiveNWorkers)
854 : 10 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
855 : 13 : }
856 : :
857 : : /*
858 : : * Vacuum or cleanup index either by leader process or by one of the worker
859 : : * process. After vacuuming the index this function copies the index
860 : : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
861 : : * segment.
862 : : */
863 : : static void
864 : 60 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
865 : : PVIndStats *indstats)
866 : : {
867 : 60 : IndexBulkDeleteResult *istat = NULL;
868 : : IndexBulkDeleteResult *istat_res;
869 : : IndexVacuumInfo ivinfo;
870 : :
871 : : /*
872 : : * Update the pointer to the corresponding bulk-deletion result if someone
873 : : * has already updated it
874 : : */
875 [ + + ]: 60 : if (indstats->istat_updated)
876 : 15 : istat = &(indstats->istat);
877 : :
878 : 60 : ivinfo.index = indrel;
377 pg@bowt.ie 879 : 60 : ivinfo.heaprel = pvs->heaprel;
843 akapila@postgresql.o 880 : 60 : ivinfo.analyze_only = false;
881 : 60 : ivinfo.report_progress = false;
821 pg@bowt.ie 882 : 60 : ivinfo.message_level = DEBUG2;
843 akapila@postgresql.o 883 : 60 : ivinfo.estimated_count = pvs->shared->estimated_count;
884 : 60 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
885 : 60 : ivinfo.strategy = pvs->bstrategy;
886 : :
887 : : /* Update error traceback information */
888 : 60 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
889 : 60 : pvs->status = indstats->status;
890 : :
891 [ + + - ]: 60 : switch (indstats->status)
892 : : {
893 : 15 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
12 msawada@postgresql.o 894 :GNC 15 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
895 : 15 : &pvs->shared->dead_items_info);
843 akapila@postgresql.o 896 :CBC 15 : break;
897 : 45 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
898 : 45 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
899 : 45 : break;
843 akapila@postgresql.o 900 :UBC 0 : default:
901 [ # # ]: 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
902 : : indstats->status,
903 : : RelationGetRelationName(indrel));
904 : : }
905 : :
906 : : /*
907 : : * Copy the index bulk-deletion result returned from ambulkdelete and
908 : : * amvacuumcleanup to the DSM segment if it's the first cycle because they
909 : : * allocate locally and it's possible that an index will be vacuumed by a
910 : : * different vacuum process the next cycle. Copying the result normally
911 : : * happens only the first time an index is vacuumed. For any additional
912 : : * vacuum pass, we directly point to the result on the DSM segment and
913 : : * pass it to vacuum index APIs so that workers can update it directly.
914 : : *
915 : : * Since all vacuum workers write the bulk-deletion result at different
916 : : * slots we can write them without locking.
917 : : */
843 akapila@postgresql.o 918 [ + + + + ]:CBC 60 : if (!indstats->istat_updated && istat_res != NULL)
919 : : {
920 : 35 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
921 : 35 : indstats->istat_updated = true;
922 : :
923 : : /* Free the locally-allocated bulk-deletion result */
924 : 35 : pfree(istat_res);
925 : : }
926 : :
927 : : /*
928 : : * Update the status to completed. No need to lock here since each worker
929 : : * touches different indexes.
930 : : */
931 : 60 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
932 : :
933 : : /* Reset error traceback information */
934 : 60 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
935 : 60 : pfree(pvs->indname);
936 : 60 : pvs->indname = NULL;
937 : :
938 : : /*
939 : : * Call the parallel variant of pgstat_progress_incr_param so workers can
940 : : * report progress of index vacuum to the leader.
941 : : */
278 msawada@postgresql.o 942 :GNC 60 : pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
843 akapila@postgresql.o 943 :CBC 60 : }
944 : :
945 : : /*
946 : : * Returns false, if the given index can't participate in the next execution of
947 : : * parallel index vacuum or parallel index cleanup.
948 : : */
949 : : static bool
950 : 54 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
951 : : bool vacuum)
952 : : {
953 : : uint8 vacoptions;
954 : :
955 : 54 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
956 : :
957 : : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
958 [ + + ]: 54 : if (vacuum)
959 : 12 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
960 : :
961 : : /* Not safe, if the index does not support parallel cleanup */
962 [ + + ]: 42 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
963 [ + + ]: 30 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
964 : 6 : return false;
965 : :
966 : : /*
967 : : * Not safe, if the index supports parallel cleanup conditionally, but we
968 : : * have already processed the index (for bulkdelete). We do this to avoid
969 : : * the need to invoke workers when parallel index cleanup doesn't need to
970 : : * scan the index. See the comments for option
971 : : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
972 : : * parallel cleanup conditionally.
973 : : */
974 [ + + ]: 36 : if (num_index_scans > 0 &&
975 [ + + ]: 11 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
976 : 9 : return false;
977 : :
978 : 27 : return true;
979 : : }
980 : :
981 : : /*
982 : : * Perform work within a launched parallel process.
983 : : *
984 : : * Since parallel vacuum workers perform only index vacuum or index cleanup,
985 : : * we don't need to report progress information.
986 : : */
987 : : void
988 : 16 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
989 : : {
990 : : ParallelVacuumState pvs;
991 : : Relation rel;
992 : : Relation *indrels;
993 : : PVIndStats *indstats;
994 : : PVShared *shared;
995 : : TidStore *dead_items;
996 : : BufferUsage *buffer_usage;
997 : : WalUsage *wal_usage;
998 : : int nindexes;
999 : : char *sharedquery;
1000 : : ErrorContextCallback errcallback;
1001 : :
1002 : : /*
1003 : : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
1004 : : * don't support parallel vacuum for autovacuum as of now.
1005 : : */
1006 [ - + ]: 16 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
1007 : :
1008 [ - + ]: 16 : elog(DEBUG1, "starting parallel vacuum worker");
1009 : :
1010 : 16 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
1011 : :
1012 : : /* Set debug_query_string for individual workers */
1013 : 16 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
1014 : 16 : debug_query_string = sharedquery;
1015 : 16 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1016 : :
1017 : : /*
1018 : : * Open table. The lock mode is the same as the leader process. It's
1019 : : * okay because the lock mode does not conflict among the parallel
1020 : : * workers.
1021 : : */
1022 : 16 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
1023 : :
1024 : : /*
1025 : : * Open all indexes. indrels are sorted in order by OID, which should be
1026 : : * matched to the leader's one.
1027 : : */
1028 : 16 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
1029 [ - + ]: 16 : Assert(nindexes > 0);
1030 : :
1031 [ + - ]: 16 : if (shared->maintenance_work_mem_worker > 0)
1032 : 16 : maintenance_work_mem = shared->maintenance_work_mem_worker;
1033 : :
1034 : : /* Set index statistics */
1035 : 16 : indstats = (PVIndStats *) shm_toc_lookup(toc,
1036 : : PARALLEL_VACUUM_KEY_INDEX_STATS,
1037 : : false);
1038 : :
1039 : : /* Find dead_items in shared memory */
12 msawada@postgresql.o 1040 :GNC 16 : dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
1041 : : shared->dead_items_handle);
1042 : :
1043 : : /* Set cost-based vacuum delay */
373 dgustafsson@postgres 1044 :CBC 16 : VacuumUpdateCosts();
843 akapila@postgresql.o 1045 : 16 : VacuumCostBalance = 0;
1046 : 16 : VacuumPageHit = 0;
1047 : 16 : VacuumPageMiss = 0;
1048 : 16 : VacuumPageDirty = 0;
1049 : 16 : VacuumCostBalanceLocal = 0;
1050 : 16 : VacuumSharedCostBalance = &(shared->cost_balance);
1051 : 16 : VacuumActiveNWorkers = &(shared->active_nworkers);
1052 : :
1053 : : /* Set parallel vacuum state */
1054 : 16 : pvs.indrels = indrels;
1055 : 16 : pvs.nindexes = nindexes;
1056 : 16 : pvs.indstats = indstats;
1057 : 16 : pvs.shared = shared;
1058 : 16 : pvs.dead_items = dead_items;
1059 : 16 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1060 : 16 : pvs.relname = pstrdup(RelationGetRelationName(rel));
379 andres@anarazel.de 1061 : 16 : pvs.heaprel = rel;
1062 : :
1063 : : /* These fields will be filled during index vacuum or cleanup */
843 akapila@postgresql.o 1064 : 16 : pvs.indname = NULL;
1065 : 16 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
1066 : :
1067 : : /* Each parallel VACUUM worker gets its own access strategy. */
373 drowley@postgresql.o 1068 : 32 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1069 : 16 : shared->ring_nbuffers * (BLCKSZ / 1024));
1070 : :
1071 : : /* Setup error traceback support for ereport() */
843 akapila@postgresql.o 1072 : 16 : errcallback.callback = parallel_vacuum_error_callback;
1073 : 16 : errcallback.arg = &pvs;
1074 : 16 : errcallback.previous = error_context_stack;
1075 : 16 : error_context_stack = &errcallback;
1076 : :
1077 : : /* Prepare to track buffer usage during parallel execution */
1078 : 16 : InstrStartParallelQuery();
1079 : :
1080 : : /* Process indexes to perform vacuum/cleanup */
1081 : 16 : parallel_vacuum_process_safe_indexes(&pvs);
1082 : :
1083 : : /* Report buffer/WAL usage during parallel execution */
1084 : 16 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1085 : 16 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
1086 : 16 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1087 : 16 : &wal_usage[ParallelWorkerNumber]);
1088 : :
12 msawada@postgresql.o 1089 :GNC 16 : TidStoreDetach(dead_items);
1090 : :
1091 : : /* Pop the error context stack */
843 akapila@postgresql.o 1092 :CBC 16 : error_context_stack = errcallback.previous;
1093 : :
1094 : 16 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1095 : 16 : table_close(rel, ShareUpdateExclusiveLock);
1096 : 16 : FreeAccessStrategy(pvs.bstrategy);
1097 : 16 : }
1098 : :
1099 : : /*
1100 : : * Error context callback for errors occurring during parallel index vacuum.
1101 : : * The error context messages should match the messages set in the lazy vacuum
1102 : : * error context. If you change this function, change vacuum_error_callback()
1103 : : * as well.
1104 : : */
1105 : : static void
843 akapila@postgresql.o 1106 :UBC 0 : parallel_vacuum_error_callback(void *arg)
1107 : : {
1108 : 0 : ParallelVacuumState *errinfo = arg;
1109 : :
1110 [ # # # ]: 0 : switch (errinfo->status)
1111 : : {
1112 : 0 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1113 : 0 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1114 : : errinfo->indname,
1115 : : errinfo->relnamespace,
1116 : : errinfo->relname);
1117 : 0 : break;
1118 : 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1119 : 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1120 : : errinfo->indname,
1121 : : errinfo->relnamespace,
1122 : : errinfo->relname);
1123 : 0 : break;
1124 : 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1125 : : case PARALLEL_INDVAC_STATUS_COMPLETED:
1126 : : default:
1127 : 0 : return;
1128 : : }
1129 : : }
|