Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * vacuumparallel.c
4 : * Support routines for parallel vacuum execution.
5 : *
6 : * This file contains routines that are intended to support setting up, using,
7 : * and tearing down a ParallelVacuumState.
8 : *
9 : * In a parallel vacuum, we perform both index bulk deletion and index cleanup
10 : * with parallel worker processes. Individual indexes are processed by one
11 : * vacuum process. ParalleVacuumState contains shared information as well as
12 : * the memory space for storing dead items allocated in the DSM segment. We
13 : * launch parallel worker processes at the start of parallel index
14 : * bulk-deletion and index cleanup and once all indexes are processed, the
15 : * parallel worker processes exit. Each time we process indexes in parallel,
16 : * the parallel context is re-initialized so that the same DSM can be used for
17 : * multiple passes of index bulk-deletion and index cleanup.
18 : *
19 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
20 : * Portions Copyright (c) 1994, Regents of the University of California
21 : *
22 : * IDENTIFICATION
23 : * src/backend/commands/vacuumparallel.c
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include "access/amapi.h"
30 : #include "access/table.h"
31 : #include "access/xact.h"
32 : #include "catalog/index.h"
33 : #include "commands/vacuum.h"
34 : #include "optimizer/paths.h"
35 : #include "pgstat.h"
36 : #include "storage/bufmgr.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/lsyscache.h"
39 : #include "utils/rel.h"
40 :
41 : /*
42 : * DSM keys for parallel vacuum. Unlike other parallel execution code, since
43 : * we don't need to worry about DSM keys conflicting with plan_node_id we can
44 : * use small integers.
45 : */
46 : #define PARALLEL_VACUUM_KEY_SHARED 1
47 : #define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
48 : #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
49 : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
50 : #define PARALLEL_VACUUM_KEY_WAL_USAGE 5
51 : #define PARALLEL_VACUUM_KEY_INDEX_STATS 6
52 :
53 : /*
54 : * Shared information among parallel workers. So this is allocated in the DSM
55 : * segment.
56 : */
57 : typedef struct PVShared
58 : {
59 : /*
60 : * Target table relid and log level (for messages about parallel workers
61 : * launched during VACUUM VERBOSE). These fields are not modified during
62 : * the parallel vacuum.
63 : */
64 : Oid relid;
65 : int elevel;
66 :
67 : /*
68 : * Fields for both index vacuum and cleanup.
69 : *
70 : * reltuples is the total number of input heap tuples. We set either old
71 : * live tuples in the index vacuum case or the new live tuples in the
72 : * index cleanup case.
73 : *
74 : * estimated_count is true if reltuples is an estimated value. (Note that
75 : * reltuples could be -1 in this case, indicating we have no idea.)
76 : */
77 : double reltuples;
78 : bool estimated_count;
79 :
80 : /*
81 : * In single process vacuum we could consume more memory during index
82 : * vacuuming or cleanup apart from the memory for heap scanning. In
83 : * parallel vacuum, since individual vacuum workers can consume memory
84 : * equal to maintenance_work_mem, the new maintenance_work_mem for each
85 : * worker is set such that the parallel operation doesn't consume more
86 : * memory than single process vacuum.
87 : */
88 : int maintenance_work_mem_worker;
89 :
90 : /*
91 : * The number of buffers each worker's Buffer Access Strategy ring should
92 : * contain.
93 : */
94 : int ring_nbuffers;
95 :
96 : /*
97 : * Shared vacuum cost balance. During parallel vacuum,
98 : * VacuumSharedCostBalance points to this value and it accumulates the
99 : * balance of each parallel vacuum worker.
100 : */
101 : pg_atomic_uint32 cost_balance;
102 :
103 : /*
104 : * Number of active parallel workers. This is used for computing the
105 : * minimum threshold of the vacuum cost balance before a worker sleeps for
106 : * cost-based delay.
107 : */
108 : pg_atomic_uint32 active_nworkers;
109 :
110 : /* Counter for vacuuming and cleanup */
111 : pg_atomic_uint32 idx;
112 : } PVShared;
113 :
114 : /* Status used during parallel index vacuum or cleanup */
115 : typedef enum PVIndVacStatus
116 : {
117 : PARALLEL_INDVAC_STATUS_INITIAL = 0,
118 : PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
119 : PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
120 : PARALLEL_INDVAC_STATUS_COMPLETED
121 : } PVIndVacStatus;
122 :
123 : /*
124 : * Struct for index vacuum statistics of an index that is used for parallel vacuum.
125 : * This includes the status of parallel index vacuum as well as index statistics.
126 : */
127 : typedef struct PVIndStats
128 : {
129 : /*
130 : * The following two fields are set by leader process before executing
131 : * parallel index vacuum or parallel index cleanup. These fields are not
132 : * fixed for the entire VACUUM operation. They are only fixed for an
133 : * individual parallel index vacuum and cleanup.
134 : *
135 : * parallel_workers_can_process is true if both leader and worker can
136 : * process the index, otherwise only leader can process it.
137 : */
138 : PVIndVacStatus status;
139 : bool parallel_workers_can_process;
140 :
141 : /*
142 : * Individual worker or leader stores the result of index vacuum or
143 : * cleanup.
144 : */
145 : bool istat_updated; /* are the stats updated? */
146 : IndexBulkDeleteResult istat;
147 : } PVIndStats;
148 :
149 : /*
150 : * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
151 : */
152 : struct ParallelVacuumState
153 : {
154 : /* NULL for worker processes */
155 : ParallelContext *pcxt;
156 :
157 : /* Parent Heap Relation */
158 : Relation heaprel;
159 :
160 : /* Target indexes */
161 : Relation *indrels;
162 : int nindexes;
163 :
164 : /* Shared information among parallel vacuum workers */
165 : PVShared *shared;
166 :
167 : /*
168 : * Shared index statistics among parallel vacuum workers. The array
169 : * element is allocated for every index, even those indexes where parallel
170 : * index vacuuming is unsafe or not worthwhile (e.g.,
171 : * will_parallel_vacuum[] is false). During parallel vacuum,
172 : * IndexBulkDeleteResult of each index is kept in DSM and is copied into
173 : * local memory at the end of parallel vacuum.
174 : */
175 : PVIndStats *indstats;
176 :
177 : /* Shared dead items space among parallel vacuum workers */
178 : VacDeadItems *dead_items;
179 :
180 : /* Points to buffer usage area in DSM */
181 : BufferUsage *buffer_usage;
182 :
183 : /* Points to WAL usage area in DSM */
184 : WalUsage *wal_usage;
185 :
186 : /*
187 : * False if the index is totally unsuitable target for all parallel
188 : * processing. For example, the index could be <
189 : * min_parallel_index_scan_size cutoff.
190 : */
191 : bool *will_parallel_vacuum;
192 :
193 : /*
194 : * The number of indexes that support parallel index bulk-deletion and
195 : * parallel index cleanup respectively.
196 : */
197 : int nindexes_parallel_bulkdel;
198 : int nindexes_parallel_cleanup;
199 : int nindexes_parallel_condcleanup;
200 :
201 : /* Buffer access strategy used by leader process */
202 : BufferAccessStrategy bstrategy;
203 :
204 : /*
205 : * Error reporting state. The error callback is set only for workers
206 : * processes during parallel index vacuum.
207 : */
208 : char *relnamespace;
209 : char *relname;
210 : char *indname;
211 : PVIndVacStatus status;
212 : };
213 :
214 : static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
215 : bool *will_parallel_vacuum);
216 : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
217 : bool vacuum);
218 : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
219 : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
220 : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
221 : PVIndStats *indstats);
222 : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
223 : bool vacuum);
224 : static void parallel_vacuum_error_callback(void *arg);
225 :
226 : /*
227 : * Try to enter parallel mode and create a parallel context. Then initialize
228 : * shared memory state.
229 : *
230 : * On success, return parallel vacuum state. Otherwise return NULL.
231 : */
232 : ParallelVacuumState *
472 akapila 233 GIC 15832 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
234 : int nrequested_workers, int max_items,
235 : int elevel, BufferAccessStrategy bstrategy)
236 : {
237 : ParallelVacuumState *pvs;
238 : ParallelContext *pcxt;
239 : PVShared *shared;
240 : VacDeadItems *dead_items;
241 : PVIndStats *indstats;
472 akapila 242 ECB : BufferUsage *buffer_usage;
243 : WalUsage *wal_usage;
244 : bool *will_parallel_vacuum;
245 : Size est_indstats_len;
246 : Size est_shared_len;
247 : Size est_dead_items_len;
472 akapila 248 GIC 15832 : int nindexes_mwm = 0;
249 15832 : int parallel_workers = 0;
250 : int querylen;
251 :
252 : /*
253 : * A parallel vacuum must be requested and there must be indexes on the
254 : * relation
255 : */
256 15832 : Assert(nrequested_workers >= 0);
472 akapila 257 CBC 15832 : Assert(nindexes > 0);
472 akapila 258 ECB :
259 : /*
260 : * Compute the number of parallel vacuum workers to launch
261 : */
472 akapila 262 GIC 15832 : will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
263 15832 : parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
264 : nrequested_workers,
472 akapila 265 ECB : will_parallel_vacuum);
472 akapila 266 CBC 15832 : if (parallel_workers <= 0)
267 : {
268 : /* Can't perform vacuum in parallel -- return NULL */
472 akapila 269 GIC 15823 : pfree(will_parallel_vacuum);
270 15823 : return NULL;
472 akapila 271 ECB : }
272 :
472 akapila 273 GIC 9 : pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
274 9 : pvs->indrels = indrels;
472 akapila 275 CBC 9 : pvs->nindexes = nindexes;
472 akapila 276 GIC 9 : pvs->will_parallel_vacuum = will_parallel_vacuum;
277 9 : pvs->bstrategy = bstrategy;
8 andres 278 GNC 9 : pvs->heaprel = rel;
472 akapila 279 ECB :
472 akapila 280 CBC 9 : EnterParallelMode();
472 akapila 281 GIC 9 : pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
282 : parallel_workers);
472 akapila 283 CBC 9 : Assert(pcxt->nworkers > 0);
284 9 : pvs->pcxt = pcxt;
472 akapila 285 ECB :
286 : /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
472 akapila 287 CBC 9 : est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
288 9 : shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
472 akapila 289 GIC 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
472 akapila 290 ECB :
291 : /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
472 akapila 292 GIC 9 : est_shared_len = sizeof(PVShared);
472 akapila 293 CBC 9 : shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
294 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
295 :
296 : /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
297 9 : est_dead_items_len = vac_max_items_to_alloc_size(max_items);
298 9 : shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
299 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
300 :
301 : /*
472 akapila 302 ECB : * Estimate space for BufferUsage and WalUsage --
303 : * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
304 : *
305 : * If there are no extensions loaded that care, we could skip this. We
306 : * have no way of knowing whether anyone's looking at pgBufferUsage or
307 : * pgWalUsage, so do it unconditionally.
308 : */
472 akapila 309 CBC 9 : shm_toc_estimate_chunk(&pcxt->estimator,
310 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
472 akapila 311 GIC 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
312 9 : shm_toc_estimate_chunk(&pcxt->estimator,
313 : mul_size(sizeof(WalUsage), pcxt->nworkers));
314 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
315 :
316 : /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
317 9 : if (debug_query_string)
318 : {
472 akapila 319 CBC 9 : querylen = strlen(debug_query_string);
472 akapila 320 GIC 9 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
472 akapila 321 CBC 9 : shm_toc_estimate_keys(&pcxt->estimator, 1);
472 akapila 322 ECB : }
323 : else
472 akapila 324 LBC 0 : querylen = 0; /* keep compiler quiet */
325 :
472 akapila 326 GIC 9 : InitializeParallelDSM(pcxt);
472 akapila 327 ECB :
328 : /* Prepare index vacuum stats */
472 akapila 329 CBC 9 : indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
234 pg 330 279 : MemSet(indstats, 0, est_indstats_len);
472 akapila 331 54 : for (int i = 0; i < nindexes; i++)
332 : {
472 akapila 333 GIC 45 : Relation indrel = indrels[i];
472 akapila 334 GBC 45 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
335 :
472 akapila 336 ECB : /*
337 : * Cleanup option should be either disabled, always performing in
338 : * parallel or conditionally performing in parallel.
339 : */
472 akapila 340 CBC 45 : Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
472 akapila 341 ECB : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
472 akapila 342 GIC 45 : Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
472 akapila 343 ECB :
472 akapila 344 CBC 45 : if (!will_parallel_vacuum[i])
472 akapila 345 GIC 3 : continue;
346 :
347 42 : if (indrel->rd_indam->amusemaintenanceworkmem)
348 6 : nindexes_mwm++;
349 :
472 akapila 350 ECB : /*
351 : * Remember the number of indexes that support parallel operation for
352 : * each phase.
353 : */
472 akapila 354 CBC 42 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
355 36 : pvs->nindexes_parallel_bulkdel++;
472 akapila 356 GIC 42 : if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
472 akapila 357 CBC 12 : pvs->nindexes_parallel_cleanup++;
358 42 : if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
472 akapila 359 GIC 24 : pvs->nindexes_parallel_condcleanup++;
360 : }
361 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
362 9 : pvs->indstats = indstats;
363 :
472 akapila 364 ECB : /* Prepare shared information */
472 akapila 365 CBC 9 : shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
366 54 : MemSet(shared, 0, est_shared_len);
367 9 : shared->relid = RelationGetRelid(rel);
368 9 : shared->elevel = elevel;
369 9 : shared->maintenance_work_mem_worker =
370 : (nindexes_mwm > 0) ?
371 9 : maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
472 akapila 372 ECB : maintenance_work_mem;
373 :
374 : /* Use the same buffer size for all workers */
2 drowley 375 GNC 9 : shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
376 :
472 akapila 377 GIC 9 : pg_atomic_init_u32(&(shared->cost_balance), 0);
472 akapila 378 CBC 9 : pg_atomic_init_u32(&(shared->active_nworkers), 0);
379 9 : pg_atomic_init_u32(&(shared->idx), 0);
472 akapila 380 ECB :
472 akapila 381 CBC 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
382 9 : pvs->shared = shared;
383 :
472 akapila 384 ECB : /* Prepare the dead_items space */
472 akapila 385 GIC 9 : dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
386 : est_dead_items_len);
387 9 : dead_items->max_items = max_items;
472 akapila 388 CBC 9 : dead_items->num_items = 0;
472 akapila 389 GIC 9 : MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
472 akapila 390 CBC 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
391 9 : pvs->dead_items = dead_items;
472 akapila 392 ECB :
393 : /*
394 : * Allocate space for each worker's BufferUsage and WalUsage; no need to
395 : * initialize
396 : */
472 akapila 397 GIC 9 : buffer_usage = shm_toc_allocate(pcxt->toc,
472 akapila 398 CBC 9 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
472 akapila 399 GIC 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
472 akapila 400 CBC 9 : pvs->buffer_usage = buffer_usage;
401 9 : wal_usage = shm_toc_allocate(pcxt->toc,
402 9 : mul_size(sizeof(WalUsage), pcxt->nworkers));
403 9 : shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
404 9 : pvs->wal_usage = wal_usage;
405 :
406 : /* Store query string for workers */
472 akapila 407 GIC 9 : if (debug_query_string)
408 : {
409 : char *sharedquery;
472 akapila 410 ECB :
472 akapila 411 CBC 9 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
412 9 : memcpy(sharedquery, debug_query_string, querylen + 1);
413 9 : sharedquery[querylen] = '\0';
414 9 : shm_toc_insert(pcxt->toc,
472 akapila 415 ECB : PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
416 : }
417 :
418 : /* Success -- return parallel vacuum state */
472 akapila 419 GIC 9 : return pvs;
472 akapila 420 ECB : }
421 :
422 : /*
423 : * Destroy the parallel context, and end parallel mode.
424 : *
425 : * Since writes are not allowed during parallel mode, copy the
426 : * updated index statistics from DSM into local memory and then later use that
427 : * to update the index statistics. One might think that we can exit from
428 : * parallel mode, update the index statistics and then destroy parallel
429 : * context, but that won't be safe (see ExitParallelMode).
430 : */
431 : void
472 akapila 432 CBC 9 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
433 : {
472 akapila 434 GIC 9 : Assert(!IsParallelWorker());
435 :
436 : /* Copy the updated statistics */
437 54 : for (int i = 0; i < pvs->nindexes; i++)
438 : {
439 45 : PVIndStats *indstats = &(pvs->indstats[i]);
440 :
441 45 : if (indstats->istat_updated)
442 : {
443 37 : istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
444 37 : memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
472 akapila 445 ECB : }
446 : else
472 akapila 447 CBC 8 : istats[i] = NULL;
448 : }
449 :
450 9 : DestroyParallelContext(pvs->pcxt);
472 akapila 451 GIC 9 : ExitParallelMode();
472 akapila 452 ECB :
472 akapila 453 GIC 9 : pfree(pvs->will_parallel_vacuum);
472 akapila 454 CBC 9 : pfree(pvs);
472 akapila 455 GIC 9 : }
472 akapila 456 ECB :
457 : /* Returns the dead items space */
458 : VacDeadItems *
472 akapila 459 GIC 9 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
472 akapila 460 ECB : {
472 akapila 461 GIC 9 : return pvs->dead_items;
462 : }
472 akapila 463 ECB :
464 : /*
465 : * Do parallel index bulk-deletion with parallel workers.
466 : */
467 : void
472 akapila 468 CBC 5 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
469 : int num_index_scans)
470 : {
472 akapila 471 GIC 5 : Assert(!IsParallelWorker());
472 akapila 472 ECB :
473 : /*
474 : * We can only provide an approximate value of num_heap_tuples, at least
475 : * for now.
476 : */
472 akapila 477 GIC 5 : pvs->shared->reltuples = num_table_tuples;
478 5 : pvs->shared->estimated_count = true;
479 :
480 5 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
472 akapila 481 CBC 5 : }
482 :
483 : /*
472 akapila 484 ECB : * Do parallel index cleanup with parallel workers.
485 : */
486 : void
472 akapila 487 GIC 9 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
488 : int num_index_scans, bool estimated_count)
489 : {
472 akapila 490 CBC 9 : Assert(!IsParallelWorker());
472 akapila 491 ECB :
492 : /*
493 : * We can provide a better estimate of total number of surviving tuples
494 : * (we assume indexes are more interested in that than in the number of
495 : * nominally live tuples).
496 : */
472 akapila 497 GIC 9 : pvs->shared->reltuples = num_table_tuples;
498 9 : pvs->shared->estimated_count = estimated_count;
499 :
472 akapila 500 CBC 9 : parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
472 akapila 501 GIC 9 : }
502 :
472 akapila 503 ECB : /*
504 : * Compute the number of parallel worker processes to request. Both index
505 : * vacuum and index cleanup can be executed with parallel workers.
506 : * The index is eligible for parallel vacuum iff its size is greater than
507 : * min_parallel_index_scan_size as invoking workers for very small indexes
508 : * can hurt performance.
509 : *
510 : * nrequested is the number of parallel workers that user requested. If
511 : * nrequested is 0, we compute the parallel degree based on nindexes, that is
512 : * the number of indexes that support parallel vacuum. This function also
513 : * sets will_parallel_vacuum to remember indexes that participate in parallel
514 : * vacuum.
515 : */
516 : static int
472 akapila 517 GIC 15832 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
518 : bool *will_parallel_vacuum)
519 : {
520 15832 : int nindexes_parallel = 0;
521 15832 : int nindexes_parallel_bulkdel = 0;
522 15832 : int nindexes_parallel_cleanup = 0;
523 : int parallel_workers;
524 :
525 : /*
526 : * We don't allow performing parallel operation in standalone backend or
527 : * when parallelism is disabled.
528 : */
529 15832 : if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
472 akapila 530 CBC 14847 : return 0;
531 :
532 : /*
472 akapila 533 ECB : * Compute the number of indexes that can participate in parallel vacuum.
534 : */
472 akapila 535 CBC 3231 : for (int i = 0; i < nindexes; i++)
536 : {
472 akapila 537 GIC 2246 : Relation indrel = indrels[i];
538 2246 : uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
539 :
540 : /* Skip index that is not a suitable target for parallel index vacuum */
541 2246 : if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
472 akapila 542 CBC 2246 : RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
543 2200 : continue;
544 :
472 akapila 545 GIC 46 : will_parallel_vacuum[i] = true;
546 :
547 46 : if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
472 akapila 548 CBC 40 : nindexes_parallel_bulkdel++;
472 akapila 549 GIC 46 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
472 akapila 550 CBC 34 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
551 40 : nindexes_parallel_cleanup++;
552 : }
553 :
554 985 : nindexes_parallel = Max(nindexes_parallel_bulkdel,
472 akapila 555 ECB : nindexes_parallel_cleanup);
556 :
557 : /* The leader process takes one index */
472 akapila 558 CBC 985 : nindexes_parallel--;
559 :
472 akapila 560 ECB : /* No index supports parallel vacuum */
472 akapila 561 CBC 985 : if (nindexes_parallel <= 0)
562 976 : return 0;
472 akapila 563 ECB :
564 : /* Compute the parallel degree */
472 akapila 565 GIC 9 : parallel_workers = (nrequested > 0) ?
566 9 : Min(nrequested, nindexes_parallel) : nindexes_parallel;
472 akapila 567 ECB :
568 : /* Cap by max_parallel_maintenance_workers */
472 akapila 569 GIC 9 : parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
570 :
472 akapila 571 CBC 9 : return parallel_workers;
572 : }
573 :
472 akapila 574 ECB : /*
575 : * Perform index vacuum or index cleanup with parallel workers. This function
576 : * must be used by the parallel vacuum leader process.
577 : */
578 : static void
472 akapila 579 CBC 14 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
580 : bool vacuum)
581 : {
472 akapila 582 ECB : int nworkers;
583 : PVIndVacStatus new_status;
584 :
472 akapila 585 GIC 14 : Assert(!IsParallelWorker());
586 :
587 14 : if (vacuum)
588 : {
589 5 : new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
590 :
591 : /* Determine the number of parallel workers to launch */
472 akapila 592 CBC 5 : nworkers = pvs->nindexes_parallel_bulkdel;
593 : }
594 : else
595 : {
472 akapila 596 GIC 9 : new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
597 :
472 akapila 598 ECB : /* Determine the number of parallel workers to launch */
472 akapila 599 GIC 9 : nworkers = pvs->nindexes_parallel_cleanup;
472 akapila 600 ECB :
601 : /* Add conditionally parallel-aware indexes if in the first time call */
472 akapila 602 CBC 9 : if (num_index_scans == 0)
472 akapila 603 GIC 4 : nworkers += pvs->nindexes_parallel_condcleanup;
604 : }
472 akapila 605 ECB :
606 : /* The leader process will participate */
472 akapila 607 GIC 14 : nworkers--;
608 :
472 akapila 609 ECB : /*
610 : * It is possible that parallel context is initialized with fewer workers
611 : * than the number of indexes that need a separate worker in the current
612 : * phase, so we need to consider it. See
613 : * parallel_vacuum_compute_workers().
614 : */
472 akapila 615 CBC 14 : nworkers = Min(nworkers, pvs->pcxt->nworkers);
472 akapila 616 ECB :
617 : /*
618 : * Set index vacuum status and mark whether parallel vacuum worker can
619 : * process it.
620 : */
472 akapila 621 GIC 80 : for (int i = 0; i < pvs->nindexes; i++)
622 : {
623 66 : PVIndStats *indstats = &(pvs->indstats[i]);
624 :
625 66 : Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
626 66 : indstats->status = new_status;
627 66 : indstats->parallel_workers_can_process =
230 akapila 628 CBC 126 : (pvs->will_parallel_vacuum[i] &&
472 akapila 629 GIC 60 : parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
630 : num_index_scans,
631 : vacuum));
632 : }
633 :
472 akapila 634 ECB : /* Reset the parallel index processing counter */
472 akapila 635 GIC 14 : pg_atomic_write_u32(&(pvs->shared->idx), 0);
472 akapila 636 ECB :
637 : /* Setup the shared cost-based vacuum delay and launch workers */
472 akapila 638 CBC 14 : if (nworkers > 0)
472 akapila 639 ECB : {
640 : /* Reinitialize parallel context to relaunch parallel workers */
472 akapila 641 CBC 11 : if (num_index_scans > 0)
642 2 : ReinitializeParallelDSM(pvs->pcxt);
643 :
644 : /*
645 : * Set up shared cost balance and the number of active workers for
646 : * vacuum delay. We need to do this before launching workers as
647 : * otherwise, they might not see the updated values for these
472 akapila 648 ECB : * parameters.
649 : */
472 akapila 650 GIC 11 : pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
472 akapila 651 CBC 11 : pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
652 :
653 : /*
472 akapila 654 ECB : * The number of workers can vary between bulkdelete and cleanup
655 : * phase.
656 : */
472 akapila 657 GIC 11 : ReinitializeParallelWorkers(pvs->pcxt, nworkers);
658 :
659 11 : LaunchParallelWorkers(pvs->pcxt);
660 :
661 11 : if (pvs->pcxt->nworkers_launched > 0)
662 : {
472 akapila 663 ECB : /*
664 : * Reset the local cost values for leader backend as we have
665 : * already accumulated the remaining balance of heap.
666 : */
472 akapila 667 GIC 11 : VacuumCostBalance = 0;
668 11 : VacuumCostBalanceLocal = 0;
669 :
472 akapila 670 ECB : /* Enable shared cost balance for leader backend */
472 akapila 671 GIC 11 : VacuumSharedCostBalance = &(pvs->shared->cost_balance);
472 akapila 672 CBC 11 : VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
673 : }
472 akapila 674 ECB :
472 akapila 675 GIC 11 : if (vacuum)
676 5 : ereport(pvs->shared->elevel,
677 : (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
678 : "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
679 : pvs->pcxt->nworkers_launched),
472 akapila 680 ECB : pvs->pcxt->nworkers_launched, nworkers)));
681 : else
472 akapila 682 GIC 6 : ereport(pvs->shared->elevel,
683 : (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
472 akapila 684 ECB : "launched %d parallel vacuum workers for index cleanup (planned: %d)",
685 : pvs->pcxt->nworkers_launched),
686 : pvs->pcxt->nworkers_launched, nworkers)));
687 : }
688 :
689 : /* Vacuum the indexes that can be processed by only leader process */
472 akapila 690 GIC 14 : parallel_vacuum_process_unsafe_indexes(pvs);
691 :
692 : /*
693 : * Join as a parallel worker. The leader vacuums alone processes all
694 : * parallel-safe indexes in the case where no workers are launched.
472 akapila 695 ECB : */
472 akapila 696 GIC 14 : parallel_vacuum_process_safe_indexes(pvs);
697 :
698 : /*
699 : * Next, accumulate buffer and WAL usage. (This must wait for the workers
700 : * to finish, or we might get incomplete data.)
701 : */
702 14 : if (nworkers > 0)
472 akapila 703 ECB : {
704 : /* Wait for all vacuum workers to finish */
472 akapila 705 GIC 11 : WaitForParallelWorkersToFinish(pvs->pcxt);
706 :
707 28 : for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
708 17 : InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
472 akapila 709 ECB : }
710 :
711 : /*
712 : * Reset all index status back to initial (while checking that we have
713 : * vacuumed all indexes).
714 : */
472 akapila 715 CBC 80 : for (int i = 0; i < pvs->nindexes; i++)
716 : {
472 akapila 717 GIC 66 : PVIndStats *indstats = &(pvs->indstats[i]);
472 akapila 718 ECB :
472 akapila 719 GIC 66 : if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
472 akapila 720 LBC 0 : elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
472 akapila 721 ECB : RelationGetRelationName(pvs->indrels[i]));
722 :
472 akapila 723 GIC 66 : indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
724 : }
725 :
726 : /*
727 : * Carry the shared balance value to heap scan and disable shared costing
472 akapila 728 ECB : */
472 akapila 729 GIC 14 : if (VacuumSharedCostBalance)
472 akapila 730 ECB : {
472 akapila 731 GIC 11 : VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
472 akapila 732 CBC 11 : VacuumSharedCostBalance = NULL;
472 akapila 733 GBC 11 : VacuumActiveNWorkers = NULL;
734 : }
472 akapila 735 GIC 14 : }
472 akapila 736 ECB :
737 : /*
738 : * Index vacuum/cleanup routine used by the leader process and parallel
739 : * vacuum worker processes to vacuum the indexes in parallel.
740 : */
741 : static void
472 akapila 742 CBC 31 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
743 : {
472 akapila 744 ECB : /*
745 : * Increment the active worker count if we are able to launch any worker.
746 : */
472 akapila 747 GIC 31 : if (VacuumActiveNWorkers)
472 akapila 748 CBC 28 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
749 :
750 : /* Loop until all indexes are vacuumed */
751 : for (;;)
472 akapila 752 GIC 66 : {
753 : int idx;
754 : PVIndStats *indstats;
472 akapila 755 ECB :
756 : /* Get an index number to process */
472 akapila 757 GIC 97 : idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
758 :
759 : /* Done for all indexes? */
472 akapila 760 CBC 97 : if (idx >= pvs->nindexes)
761 31 : break;
762 :
472 akapila 763 GIC 66 : indstats = &(pvs->indstats[idx]);
764 :
472 akapila 765 ECB : /*
766 : * Skip vacuuming index that is unsafe for workers or has an
767 : * unsuitable target for parallel index vacuum (this is vacuumed in
768 : * parallel_vacuum_process_unsafe_indexes() by the leader).
769 : */
472 akapila 770 CBC 66 : if (!indstats->parallel_workers_can_process)
472 akapila 771 GIC 26 : continue;
772 :
472 akapila 773 ECB : /* Do vacuum or cleanup of the index */
472 akapila 774 CBC 40 : parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
775 : }
472 akapila 776 ECB :
777 : /*
778 : * We have completed the index vacuum so decrement the active worker
779 : * count.
780 : */
472 akapila 781 GIC 31 : if (VacuumActiveNWorkers)
782 28 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
472 akapila 783 CBC 31 : }
472 akapila 784 ECB :
785 : /*
786 : * Perform parallel vacuuming of indexes in leader process.
787 : *
788 : * Handles index vacuuming (or index cleanup) for indexes that are not
789 : * parallel safe. It's possible that this will vary for a given index, based
790 : * on details like whether we're performing index cleanup right now.
791 : *
792 : * Also performs vacuuming of smaller indexes that fell under the size cutoff
793 : * enforced by parallel_vacuum_compute_workers().
794 : */
795 : static void
472 akapila 796 CBC 14 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
797 : {
472 akapila 798 GIC 14 : Assert(!IsParallelWorker());
799 :
800 : /*
801 : * Increment the active worker count if we are able to launch any worker.
802 : */
803 14 : if (VacuumActiveNWorkers)
804 11 : pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
805 :
806 80 : for (int i = 0; i < pvs->nindexes; i++)
807 : {
808 66 : PVIndStats *indstats = &(pvs->indstats[i]);
472 akapila 809 ECB :
810 : /* Skip, indexes that are safe for workers */
472 akapila 811 CBC 66 : if (indstats->parallel_workers_can_process)
472 akapila 812 GIC 40 : continue;
813 :
814 : /* Do vacuum or cleanup of the index */
815 26 : parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
472 akapila 816 ECB : }
817 :
818 : /*
819 : * We have completed the index vacuum so decrement the active worker
820 : * count.
821 : */
472 akapila 822 GIC 14 : if (VacuumActiveNWorkers)
823 11 : pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
472 akapila 824 CBC 14 : }
472 akapila 825 ECB :
826 : /*
827 : * Vacuum or cleanup index either by leader process or by one of the worker
828 : * process. After vacuuming the index this function copies the index
829 : * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
830 : * segment.
831 : */
832 : static void
472 akapila 833 GIC 66 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
834 : PVIndStats *indstats)
472 akapila 835 ECB : {
472 akapila 836 CBC 66 : IndexBulkDeleteResult *istat = NULL;
472 akapila 837 ECB : IndexBulkDeleteResult *istat_res;
838 : IndexVacuumInfo ivinfo;
839 :
840 : /*
841 : * Update the pointer to the corresponding bulk-deletion result if someone
842 : * has already updated it
843 : */
472 akapila 844 GIC 66 : if (indstats->istat_updated)
845 21 : istat = &(indstats->istat);
472 akapila 846 ECB :
472 akapila 847 GIC 66 : ivinfo.index = indrel;
6 pg 848 GNC 66 : ivinfo.heaprel = pvs->heaprel;
472 akapila 849 GIC 66 : ivinfo.analyze_only = false;
472 akapila 850 CBC 66 : ivinfo.report_progress = false;
450 pg 851 GIC 66 : ivinfo.message_level = DEBUG2;
472 akapila 852 66 : ivinfo.estimated_count = pvs->shared->estimated_count;
853 66 : ivinfo.num_heap_tuples = pvs->shared->reltuples;
854 66 : ivinfo.strategy = pvs->bstrategy;
855 :
856 : /* Update error traceback information */
857 66 : pvs->indname = pstrdup(RelationGetRelationName(indrel));
472 akapila 858 CBC 66 : pvs->status = indstats->status;
472 akapila 859 ECB :
472 akapila 860 GIC 66 : switch (indstats->status)
472 akapila 861 ECB : {
472 akapila 862 CBC 21 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
863 21 : istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
864 21 : break;
865 45 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
866 45 : istat_res = vac_cleanup_one_index(&ivinfo, istat);
867 45 : break;
472 akapila 868 LBC 0 : default:
472 akapila 869 UIC 0 : elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
870 : indstats->status,
472 akapila 871 ECB : RelationGetRelationName(indrel));
872 : }
873 :
874 : /*
875 : * Copy the index bulk-deletion result returned from ambulkdelete and
876 : * amvacuumcleanup to the DSM segment if it's the first cycle because they
877 : * allocate locally and it's possible that an index will be vacuumed by a
878 : * different vacuum process the next cycle. Copying the result normally
879 : * happens only the first time an index is vacuumed. For any additional
880 : * vacuum pass, we directly point to the result on the DSM segment and
881 : * pass it to vacuum index APIs so that workers can update it directly.
472 akapila 882 EUB : *
883 : * Since all vacuum workers write the bulk-deletion result at different
884 : * slots we can write them without locking.
885 : */
472 akapila 886 GIC 66 : if (!indstats->istat_updated && istat_res != NULL)
887 : {
888 37 : memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
889 37 : indstats->istat_updated = true;
890 :
891 : /* Free the locally-allocated bulk-deletion result */
892 37 : pfree(istat_res);
893 : }
894 :
895 : /*
896 : * Update the status to completed. No need to lock here since each worker
897 : * touches different indexes.
898 : */
899 66 : indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
472 akapila 900 ECB :
901 : /* Reset error traceback information */
472 akapila 902 CBC 66 : pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
903 66 : pfree(pvs->indname);
472 akapila 904 GIC 66 : pvs->indname = NULL;
905 66 : }
472 akapila 906 ECB :
907 : /*
908 : * Returns false, if the given index can't participate in the next execution of
909 : * parallel index vacuum or parallel index cleanup.
910 : */
911 : static bool
472 akapila 912 GIC 60 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
472 akapila 913 ECB : bool vacuum)
914 : {
915 : uint8 vacoptions;
916 :
472 akapila 917 CBC 60 : vacoptions = indrel->rd_indam->amparallelvacuumoptions;
472 akapila 918 ECB :
919 : /* In parallel vacuum case, check if it supports parallel bulk-deletion */
472 akapila 920 GIC 60 : if (vacuum)
921 18 : return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
922 :
923 : /* Not safe, if the index does not support parallel cleanup */
924 42 : if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
925 30 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
472 akapila 926 CBC 6 : return false;
927 :
928 : /*
929 : * Not safe, if the index supports parallel cleanup conditionally, but we
930 : * have already processed the index (for bulkdelete). We do this to avoid
472 akapila 931 ECB : * the need to invoke workers when parallel index cleanup doesn't need to
932 : * scan the index. See the comments for option
933 : * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
934 : * parallel cleanup conditionally.
935 : */
472 akapila 936 GIC 36 : if (num_index_scans > 0 &&
937 16 : ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
472 akapila 938 CBC 12 : return false;
472 akapila 939 ECB :
472 akapila 940 CBC 24 : return true;
941 : }
942 :
943 : /*
944 : * Perform work within a launched parallel process.
945 : *
946 : * Since parallel vacuum workers perform only index vacuum or index cleanup,
947 : * we don't need to report progress information.
948 : */
949 : void
950 17 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
472 akapila 951 ECB : {
952 : ParallelVacuumState pvs;
953 : Relation rel;
954 : Relation *indrels;
955 : PVIndStats *indstats;
956 : PVShared *shared;
957 : VacDeadItems *dead_items;
958 : BufferUsage *buffer_usage;
959 : WalUsage *wal_usage;
960 : int nindexes;
961 : char *sharedquery;
962 : ErrorContextCallback errcallback;
963 :
964 : /*
965 : * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
966 : * don't support parallel vacuum for autovacuum as of now.
967 : */
472 akapila 968 GIC 17 : Assert(MyProc->statusFlags == PROC_IN_VACUUM);
969 :
970 17 : elog(DEBUG1, "starting parallel vacuum worker");
971 :
972 17 : shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
973 :
974 : /* Set debug_query_string for individual workers */
975 17 : sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
976 17 : debug_query_string = sharedquery;
977 17 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
978 :
979 : /*
980 : * Open table. The lock mode is the same as the leader process. It's
981 : * okay because the lock mode does not conflict among the parallel
472 akapila 982 ECB : * workers.
983 : */
472 akapila 984 CBC 17 : rel = table_open(shared->relid, ShareUpdateExclusiveLock);
985 :
472 akapila 986 ECB : /*
987 : * Open all indexes. indrels are sorted in order by OID, which should be
988 : * matched to the leader's one.
989 : */
472 akapila 990 CBC 17 : vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
991 17 : Assert(nindexes > 0);
992 :
472 akapila 993 GIC 17 : if (shared->maintenance_work_mem_worker > 0)
994 17 : maintenance_work_mem = shared->maintenance_work_mem_worker;
995 :
996 : /* Set index statistics */
997 17 : indstats = (PVIndStats *) shm_toc_lookup(toc,
472 akapila 998 ECB : PARALLEL_VACUUM_KEY_INDEX_STATS,
999 : false);
1000 :
1001 : /* Set dead_items space */
472 akapila 1002 GIC 17 : dead_items = (VacDeadItems *) shm_toc_lookup(toc,
1003 : PARALLEL_VACUUM_KEY_DEAD_ITEMS,
472 akapila 1004 ECB : false);
1005 :
1006 : /* Set cost-based vacuum delay */
2 dgustafsson 1007 GNC 17 : VacuumUpdateCosts();
472 akapila 1008 CBC 17 : VacuumCostBalance = 0;
472 akapila 1009 GIC 17 : VacuumPageHit = 0;
1010 17 : VacuumPageMiss = 0;
472 akapila 1011 CBC 17 : VacuumPageDirty = 0;
472 akapila 1012 GIC 17 : VacuumCostBalanceLocal = 0;
1013 17 : VacuumSharedCostBalance = &(shared->cost_balance);
1014 17 : VacuumActiveNWorkers = &(shared->active_nworkers);
1015 :
472 akapila 1016 ECB : /* Set parallel vacuum state */
472 akapila 1017 GIC 17 : pvs.indrels = indrels;
1018 17 : pvs.nindexes = nindexes;
1019 17 : pvs.indstats = indstats;
1020 17 : pvs.shared = shared;
472 akapila 1021 CBC 17 : pvs.dead_items = dead_items;
1022 17 : pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
1023 17 : pvs.relname = pstrdup(RelationGetRelationName(rel));
8 andres 1024 GNC 17 : pvs.heaprel = rel;
472 akapila 1025 ECB :
1026 : /* These fields will be filled during index vacuum or cleanup */
472 akapila 1027 CBC 17 : pvs.indname = NULL;
1028 17 : pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
472 akapila 1029 ECB :
1030 : /* Each parallel VACUUM worker gets its own access strategy. */
2 drowley 1031 GNC 34 : pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
1032 17 : shared->ring_nbuffers * (BLCKSZ / 1024));
472 akapila 1033 ECB :
1034 : /* Setup error traceback support for ereport() */
472 akapila 1035 CBC 17 : errcallback.callback = parallel_vacuum_error_callback;
1036 17 : errcallback.arg = &pvs;
1037 17 : errcallback.previous = error_context_stack;
1038 17 : error_context_stack = &errcallback;
472 akapila 1039 ECB :
1040 : /* Prepare to track buffer usage during parallel execution */
472 akapila 1041 GIC 17 : InstrStartParallelQuery();
1042 :
472 akapila 1043 ECB : /* Process indexes to perform vacuum/cleanup */
472 akapila 1044 CBC 17 : parallel_vacuum_process_safe_indexes(&pvs);
1045 :
1046 : /* Report buffer/WAL usage during parallel execution */
1047 17 : buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
1048 17 : wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
472 akapila 1049 GIC 17 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1050 17 : &wal_usage[ParallelWorkerNumber]);
472 akapila 1051 ECB :
1052 : /* Pop the error context stack */
472 akapila 1053 CBC 17 : error_context_stack = errcallback.previous;
472 akapila 1054 ECB :
472 akapila 1055 GIC 17 : vac_close_indexes(nindexes, indrels, RowExclusiveLock);
1056 17 : table_close(rel, ShareUpdateExclusiveLock);
472 akapila 1057 CBC 17 : FreeAccessStrategy(pvs.bstrategy);
472 akapila 1058 GIC 17 : }
1059 :
472 akapila 1060 ECB : /*
1061 : * Error context callback for errors occurring during parallel index vacuum.
1062 : * The error context messages should match the messages set in the lazy vacuum
1063 : * error context. If you change this function, change vacuum_error_callback()
1064 : * as well.
1065 : */
1066 : static void
472 akapila 1067 UIC 0 : parallel_vacuum_error_callback(void *arg)
1068 : {
472 akapila 1069 LBC 0 : ParallelVacuumState *errinfo = arg;
1070 :
1071 0 : switch (errinfo->status)
472 akapila 1072 ECB : {
472 akapila 1073 LBC 0 : case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
1074 0 : errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
1075 : errinfo->indname,
1076 : errinfo->relnamespace,
1077 : errinfo->relname);
472 akapila 1078 UIC 0 : break;
1079 0 : case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
1080 0 : errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
1081 : errinfo->indname,
1082 : errinfo->relnamespace,
472 akapila 1083 EUB : errinfo->relname);
472 akapila 1084 UIC 0 : break;
472 akapila 1085 UBC 0 : case PARALLEL_INDVAC_STATUS_INITIAL:
1086 : case PARALLEL_INDVAC_STATUS_COMPLETED:
472 akapila 1087 EUB : default:
472 akapila 1088 UIC 0 : return;
472 akapila 1089 EUB : }
1090 : }
|