LCOV - differential code coverage report
Current view: top level - src/backend/commands - vacuumparallel.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 95.3 % 322 307 7 7 1 2 173 7 125 12 174 1
Current Date: 2023-04-08 15:15:32 Functions: 92.3 % 13 12 1 12 1 12
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * vacuumparallel.c
       4                 :  *    Support routines for parallel vacuum execution.
       5                 :  *
       6                 :  * This file contains routines that are intended to support setting up, using,
       7                 :  * and tearing down a ParallelVacuumState.
       8                 :  *
       9                 :  * In a parallel vacuum, we perform both index bulk deletion and index cleanup
      10                 :  * with parallel worker processes.  Individual indexes are processed by one
      11                 :  * vacuum process.  ParalleVacuumState contains shared information as well as
      12                 :  * the memory space for storing dead items allocated in the DSM segment.  We
      13                 :  * launch parallel worker processes at the start of parallel index
      14                 :  * bulk-deletion and index cleanup and once all indexes are processed, the
      15                 :  * parallel worker processes exit.  Each time we process indexes in parallel,
      16                 :  * the parallel context is re-initialized so that the same DSM can be used for
      17                 :  * multiple passes of index bulk-deletion and index cleanup.
      18                 :  *
      19                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      20                 :  * Portions Copyright (c) 1994, Regents of the University of California
      21                 :  *
      22                 :  * IDENTIFICATION
      23                 :  *    src/backend/commands/vacuumparallel.c
      24                 :  *
      25                 :  *-------------------------------------------------------------------------
      26                 :  */
      27                 : #include "postgres.h"
      28                 : 
      29                 : #include "access/amapi.h"
      30                 : #include "access/table.h"
      31                 : #include "access/xact.h"
      32                 : #include "catalog/index.h"
      33                 : #include "commands/vacuum.h"
      34                 : #include "optimizer/paths.h"
      35                 : #include "pgstat.h"
      36                 : #include "storage/bufmgr.h"
      37                 : #include "tcop/tcopprot.h"
      38                 : #include "utils/lsyscache.h"
      39                 : #include "utils/rel.h"
      40                 : 
      41                 : /*
      42                 :  * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
      43                 :  * we don't need to worry about DSM keys conflicting with plan_node_id we can
      44                 :  * use small integers.
      45                 :  */
      46                 : #define PARALLEL_VACUUM_KEY_SHARED          1
      47                 : #define PARALLEL_VACUUM_KEY_DEAD_ITEMS      2
      48                 : #define PARALLEL_VACUUM_KEY_QUERY_TEXT      3
      49                 : #define PARALLEL_VACUUM_KEY_BUFFER_USAGE    4
      50                 : #define PARALLEL_VACUUM_KEY_WAL_USAGE       5
      51                 : #define PARALLEL_VACUUM_KEY_INDEX_STATS     6
      52                 : 
      53                 : /*
      54                 :  * Shared information among parallel workers.  So this is allocated in the DSM
      55                 :  * segment.
      56                 :  */
      57                 : typedef struct PVShared
      58                 : {
      59                 :     /*
      60                 :      * Target table relid and log level (for messages about parallel workers
      61                 :      * launched during VACUUM VERBOSE).  These fields are not modified during
      62                 :      * the parallel vacuum.
      63                 :      */
      64                 :     Oid         relid;
      65                 :     int         elevel;
      66                 : 
      67                 :     /*
      68                 :      * Fields for both index vacuum and cleanup.
      69                 :      *
      70                 :      * reltuples is the total number of input heap tuples.  We set either old
      71                 :      * live tuples in the index vacuum case or the new live tuples in the
      72                 :      * index cleanup case.
      73                 :      *
      74                 :      * estimated_count is true if reltuples is an estimated value.  (Note that
      75                 :      * reltuples could be -1 in this case, indicating we have no idea.)
      76                 :      */
      77                 :     double      reltuples;
      78                 :     bool        estimated_count;
      79                 : 
      80                 :     /*
      81                 :      * In single process vacuum we could consume more memory during index
      82                 :      * vacuuming or cleanup apart from the memory for heap scanning.  In
      83                 :      * parallel vacuum, since individual vacuum workers can consume memory
      84                 :      * equal to maintenance_work_mem, the new maintenance_work_mem for each
      85                 :      * worker is set such that the parallel operation doesn't consume more
      86                 :      * memory than single process vacuum.
      87                 :      */
      88                 :     int         maintenance_work_mem_worker;
      89                 : 
      90                 :     /*
      91                 :      * The number of buffers each worker's Buffer Access Strategy ring should
      92                 :      * contain.
      93                 :      */
      94                 :     int         ring_nbuffers;
      95                 : 
      96                 :     /*
      97                 :      * Shared vacuum cost balance.  During parallel vacuum,
      98                 :      * VacuumSharedCostBalance points to this value and it accumulates the
      99                 :      * balance of each parallel vacuum worker.
     100                 :      */
     101                 :     pg_atomic_uint32 cost_balance;
     102                 : 
     103                 :     /*
     104                 :      * Number of active parallel workers.  This is used for computing the
     105                 :      * minimum threshold of the vacuum cost balance before a worker sleeps for
     106                 :      * cost-based delay.
     107                 :      */
     108                 :     pg_atomic_uint32 active_nworkers;
     109                 : 
     110                 :     /* Counter for vacuuming and cleanup */
     111                 :     pg_atomic_uint32 idx;
     112                 : } PVShared;
     113                 : 
     114                 : /* Status used during parallel index vacuum or cleanup */
     115                 : typedef enum PVIndVacStatus
     116                 : {
     117                 :     PARALLEL_INDVAC_STATUS_INITIAL = 0,
     118                 :     PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
     119                 :     PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
     120                 :     PARALLEL_INDVAC_STATUS_COMPLETED
     121                 : } PVIndVacStatus;
     122                 : 
     123                 : /*
     124                 :  * Struct for index vacuum statistics of an index that is used for parallel vacuum.
     125                 :  * This includes the status of parallel index vacuum as well as index statistics.
     126                 :  */
     127                 : typedef struct PVIndStats
     128                 : {
     129                 :     /*
     130                 :      * The following two fields are set by leader process before executing
     131                 :      * parallel index vacuum or parallel index cleanup.  These fields are not
     132                 :      * fixed for the entire VACUUM operation.  They are only fixed for an
     133                 :      * individual parallel index vacuum and cleanup.
     134                 :      *
     135                 :      * parallel_workers_can_process is true if both leader and worker can
     136                 :      * process the index, otherwise only leader can process it.
     137                 :      */
     138                 :     PVIndVacStatus status;
     139                 :     bool        parallel_workers_can_process;
     140                 : 
     141                 :     /*
     142                 :      * Individual worker or leader stores the result of index vacuum or
     143                 :      * cleanup.
     144                 :      */
     145                 :     bool        istat_updated;  /* are the stats updated? */
     146                 :     IndexBulkDeleteResult istat;
     147                 : } PVIndStats;
     148                 : 
     149                 : /*
     150                 :  * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
     151                 :  */
     152                 : struct ParallelVacuumState
     153                 : {
     154                 :     /* NULL for worker processes */
     155                 :     ParallelContext *pcxt;
     156                 : 
     157                 :     /* Parent Heap Relation */
     158                 :     Relation    heaprel;
     159                 : 
     160                 :     /* Target indexes */
     161                 :     Relation   *indrels;
     162                 :     int         nindexes;
     163                 : 
     164                 :     /* Shared information among parallel vacuum workers */
     165                 :     PVShared   *shared;
     166                 : 
     167                 :     /*
     168                 :      * Shared index statistics among parallel vacuum workers. The array
     169                 :      * element is allocated for every index, even those indexes where parallel
     170                 :      * index vacuuming is unsafe or not worthwhile (e.g.,
     171                 :      * will_parallel_vacuum[] is false).  During parallel vacuum,
     172                 :      * IndexBulkDeleteResult of each index is kept in DSM and is copied into
     173                 :      * local memory at the end of parallel vacuum.
     174                 :      */
     175                 :     PVIndStats *indstats;
     176                 : 
     177                 :     /* Shared dead items space among parallel vacuum workers */
     178                 :     VacDeadItems *dead_items;
     179                 : 
     180                 :     /* Points to buffer usage area in DSM */
     181                 :     BufferUsage *buffer_usage;
     182                 : 
     183                 :     /* Points to WAL usage area in DSM */
     184                 :     WalUsage   *wal_usage;
     185                 : 
     186                 :     /*
     187                 :      * False if the index is totally unsuitable target for all parallel
     188                 :      * processing. For example, the index could be <
     189                 :      * min_parallel_index_scan_size cutoff.
     190                 :      */
     191                 :     bool       *will_parallel_vacuum;
     192                 : 
     193                 :     /*
     194                 :      * The number of indexes that support parallel index bulk-deletion and
     195                 :      * parallel index cleanup respectively.
     196                 :      */
     197                 :     int         nindexes_parallel_bulkdel;
     198                 :     int         nindexes_parallel_cleanup;
     199                 :     int         nindexes_parallel_condcleanup;
     200                 : 
     201                 :     /* Buffer access strategy used by leader process */
     202                 :     BufferAccessStrategy bstrategy;
     203                 : 
     204                 :     /*
     205                 :      * Error reporting state.  The error callback is set only for workers
     206                 :      * processes during parallel index vacuum.
     207                 :      */
     208                 :     char       *relnamespace;
     209                 :     char       *relname;
     210                 :     char       *indname;
     211                 :     PVIndVacStatus status;
     212                 : };
     213                 : 
     214                 : static int  parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     215                 :                                             bool *will_parallel_vacuum);
     216                 : static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     217                 :                                                 bool vacuum);
     218                 : static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
     219                 : static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
     220                 : static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     221                 :                                               PVIndStats *indstats);
     222                 : static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     223                 :                                                    bool vacuum);
     224                 : static void parallel_vacuum_error_callback(void *arg);
     225                 : 
     226                 : /*
     227                 :  * Try to enter parallel mode and create a parallel context.  Then initialize
     228                 :  * shared memory state.
     229                 :  *
     230                 :  * On success, return parallel vacuum state.  Otherwise return NULL.
     231                 :  */
     232                 : ParallelVacuumState *
     233 GIC       15832 : parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
     234                 :                      int nrequested_workers, int max_items,
     235                 :                      int elevel, BufferAccessStrategy bstrategy)
     236                 : {
     237                 :     ParallelVacuumState *pvs;
     238                 :     ParallelContext *pcxt;
     239                 :     PVShared   *shared;
     240                 :     VacDeadItems *dead_items;
     241                 :     PVIndStats *indstats;
     242 ECB             :     BufferUsage *buffer_usage;
     243                 :     WalUsage   *wal_usage;
     244                 :     bool       *will_parallel_vacuum;
     245                 :     Size        est_indstats_len;
     246                 :     Size        est_shared_len;
     247                 :     Size        est_dead_items_len;
     248 GIC       15832 :     int         nindexes_mwm = 0;
     249           15832 :     int         parallel_workers = 0;
     250                 :     int         querylen;
     251                 : 
     252                 :     /*
     253                 :      * A parallel vacuum must be requested and there must be indexes on the
     254                 :      * relation
     255                 :      */
     256           15832 :     Assert(nrequested_workers >= 0);
     257 CBC       15832 :     Assert(nindexes > 0);
     258 ECB             : 
     259                 :     /*
     260                 :      * Compute the number of parallel vacuum workers to launch
     261                 :      */
     262 GIC       15832 :     will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
     263           15832 :     parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
     264                 :                                                        nrequested_workers,
     265 ECB             :                                                        will_parallel_vacuum);
     266 CBC       15832 :     if (parallel_workers <= 0)
     267                 :     {
     268                 :         /* Can't perform vacuum in parallel -- return NULL */
     269 GIC       15823 :         pfree(will_parallel_vacuum);
     270           15823 :         return NULL;
     271 ECB             :     }
     272                 : 
     273 GIC           9 :     pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
     274               9 :     pvs->indrels = indrels;
     275 CBC           9 :     pvs->nindexes = nindexes;
     276 GIC           9 :     pvs->will_parallel_vacuum = will_parallel_vacuum;
     277               9 :     pvs->bstrategy = bstrategy;
     278 GNC           9 :     pvs->heaprel = rel;
     279 ECB             : 
     280 CBC           9 :     EnterParallelMode();
     281 GIC           9 :     pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
     282                 :                                  parallel_workers);
     283 CBC           9 :     Assert(pcxt->nworkers > 0);
     284               9 :     pvs->pcxt = pcxt;
     285 ECB             : 
     286                 :     /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
     287 CBC           9 :     est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
     288               9 :     shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
     289 GIC           9 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     290 ECB             : 
     291                 :     /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
     292 GIC           9 :     est_shared_len = sizeof(PVShared);
     293 CBC           9 :     shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
     294               9 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     295                 : 
     296                 :     /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
     297               9 :     est_dead_items_len = vac_max_items_to_alloc_size(max_items);
     298               9 :     shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
     299               9 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     300                 : 
     301                 :     /*
     302 ECB             :      * Estimate space for BufferUsage and WalUsage --
     303                 :      * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
     304                 :      *
     305                 :      * If there are no extensions loaded that care, we could skip this.  We
     306                 :      * have no way of knowing whether anyone's looking at pgBufferUsage or
     307                 :      * pgWalUsage, so do it unconditionally.
     308                 :      */
     309 CBC           9 :     shm_toc_estimate_chunk(&pcxt->estimator,
     310                 :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     311 GIC           9 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     312               9 :     shm_toc_estimate_chunk(&pcxt->estimator,
     313                 :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     314               9 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     315                 : 
     316                 :     /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
     317               9 :     if (debug_query_string)
     318                 :     {
     319 CBC           9 :         querylen = strlen(debug_query_string);
     320 GIC           9 :         shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
     321 CBC           9 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     322 ECB             :     }
     323                 :     else
     324 LBC           0 :         querylen = 0;           /* keep compiler quiet */
     325                 : 
     326 GIC           9 :     InitializeParallelDSM(pcxt);
     327 ECB             : 
     328                 :     /* Prepare index vacuum stats */
     329 CBC           9 :     indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
     330             279 :     MemSet(indstats, 0, est_indstats_len);
     331              54 :     for (int i = 0; i < nindexes; i++)
     332                 :     {
     333 GIC          45 :         Relation    indrel = indrels[i];
     334 GBC          45 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     335                 : 
     336 ECB             :         /*
     337                 :          * Cleanup option should be either disabled, always performing in
     338                 :          * parallel or conditionally performing in parallel.
     339                 :          */
     340 CBC          45 :         Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
     341 ECB             :                ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
     342 GIC          45 :         Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
     343 ECB             : 
     344 CBC          45 :         if (!will_parallel_vacuum[i])
     345 GIC           3 :             continue;
     346                 : 
     347              42 :         if (indrel->rd_indam->amusemaintenanceworkmem)
     348               6 :             nindexes_mwm++;
     349                 : 
     350 ECB             :         /*
     351                 :          * Remember the number of indexes that support parallel operation for
     352                 :          * each phase.
     353                 :          */
     354 CBC          42 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     355              36 :             pvs->nindexes_parallel_bulkdel++;
     356 GIC          42 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
     357 CBC          12 :             pvs->nindexes_parallel_cleanup++;
     358              42 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
     359 GIC          24 :             pvs->nindexes_parallel_condcleanup++;
     360                 :     }
     361               9 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
     362               9 :     pvs->indstats = indstats;
     363                 : 
     364 ECB             :     /* Prepare shared information */
     365 CBC           9 :     shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
     366              54 :     MemSet(shared, 0, est_shared_len);
     367               9 :     shared->relid = RelationGetRelid(rel);
     368               9 :     shared->elevel = elevel;
     369               9 :     shared->maintenance_work_mem_worker =
     370                 :         (nindexes_mwm > 0) ?
     371               9 :         maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
     372 ECB             :         maintenance_work_mem;
     373                 : 
     374                 :     /* Use the same buffer size for all workers */
     375 GNC           9 :     shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
     376                 : 
     377 GIC           9 :     pg_atomic_init_u32(&(shared->cost_balance), 0);
     378 CBC           9 :     pg_atomic_init_u32(&(shared->active_nworkers), 0);
     379               9 :     pg_atomic_init_u32(&(shared->idx), 0);
     380 ECB             : 
     381 CBC           9 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
     382               9 :     pvs->shared = shared;
     383                 : 
     384 ECB             :     /* Prepare the dead_items space */
     385 GIC           9 :     dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
     386                 :                                                    est_dead_items_len);
     387               9 :     dead_items->max_items = max_items;
     388 CBC           9 :     dead_items->num_items = 0;
     389 GIC           9 :     MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
     390 CBC           9 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
     391               9 :     pvs->dead_items = dead_items;
     392 ECB             : 
     393                 :     /*
     394                 :      * Allocate space for each worker's BufferUsage and WalUsage; no need to
     395                 :      * initialize
     396                 :      */
     397 GIC           9 :     buffer_usage = shm_toc_allocate(pcxt->toc,
     398 CBC           9 :                                     mul_size(sizeof(BufferUsage), pcxt->nworkers));
     399 GIC           9 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
     400 CBC           9 :     pvs->buffer_usage = buffer_usage;
     401               9 :     wal_usage = shm_toc_allocate(pcxt->toc,
     402               9 :                                  mul_size(sizeof(WalUsage), pcxt->nworkers));
     403               9 :     shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
     404               9 :     pvs->wal_usage = wal_usage;
     405                 : 
     406                 :     /* Store query string for workers */
     407 GIC           9 :     if (debug_query_string)
     408                 :     {
     409                 :         char       *sharedquery;
     410 ECB             : 
     411 CBC           9 :         sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
     412               9 :         memcpy(sharedquery, debug_query_string, querylen + 1);
     413               9 :         sharedquery[querylen] = '\0';
     414               9 :         shm_toc_insert(pcxt->toc,
     415 ECB             :                        PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
     416                 :     }
     417                 : 
     418                 :     /* Success -- return parallel vacuum state */
     419 GIC           9 :     return pvs;
     420 ECB             : }
     421                 : 
     422                 : /*
     423                 :  * Destroy the parallel context, and end parallel mode.
     424                 :  *
     425                 :  * Since writes are not allowed during parallel mode, copy the
     426                 :  * updated index statistics from DSM into local memory and then later use that
     427                 :  * to update the index statistics.  One might think that we can exit from
     428                 :  * parallel mode, update the index statistics and then destroy parallel
     429                 :  * context, but that won't be safe (see ExitParallelMode).
     430                 :  */
     431                 : void
     432 CBC           9 : parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
     433                 : {
     434 GIC           9 :     Assert(!IsParallelWorker());
     435                 : 
     436                 :     /* Copy the updated statistics */
     437              54 :     for (int i = 0; i < pvs->nindexes; i++)
     438                 :     {
     439              45 :         PVIndStats *indstats = &(pvs->indstats[i]);
     440                 : 
     441              45 :         if (indstats->istat_updated)
     442                 :         {
     443              37 :             istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     444              37 :             memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
     445 ECB             :         }
     446                 :         else
     447 CBC           8 :             istats[i] = NULL;
     448                 :     }
     449                 : 
     450               9 :     DestroyParallelContext(pvs->pcxt);
     451 GIC           9 :     ExitParallelMode();
     452 ECB             : 
     453 GIC           9 :     pfree(pvs->will_parallel_vacuum);
     454 CBC           9 :     pfree(pvs);
     455 GIC           9 : }
     456 ECB             : 
     457                 : /* Returns the dead items space */
     458                 : VacDeadItems *
     459 GIC           9 : parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
     460 ECB             : {
     461 GIC           9 :     return pvs->dead_items;
     462                 : }
     463 ECB             : 
     464                 : /*
     465                 :  * Do parallel index bulk-deletion with parallel workers.
     466                 :  */
     467                 : void
     468 CBC           5 : parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     469                 :                                     int num_index_scans)
     470                 : {
     471 GIC           5 :     Assert(!IsParallelWorker());
     472 ECB             : 
     473                 :     /*
     474                 :      * We can only provide an approximate value of num_heap_tuples, at least
     475                 :      * for now.
     476                 :      */
     477 GIC           5 :     pvs->shared->reltuples = num_table_tuples;
     478               5 :     pvs->shared->estimated_count = true;
     479                 : 
     480               5 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
     481 CBC           5 : }
     482                 : 
     483                 : /*
     484 ECB             :  * Do parallel index cleanup with parallel workers.
     485                 :  */
     486                 : void
     487 GIC           9 : parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
     488                 :                                     int num_index_scans, bool estimated_count)
     489                 : {
     490 CBC           9 :     Assert(!IsParallelWorker());
     491 ECB             : 
     492                 :     /*
     493                 :      * We can provide a better estimate of total number of surviving tuples
     494                 :      * (we assume indexes are more interested in that than in the number of
     495                 :      * nominally live tuples).
     496                 :      */
     497 GIC           9 :     pvs->shared->reltuples = num_table_tuples;
     498               9 :     pvs->shared->estimated_count = estimated_count;
     499                 : 
     500 CBC           9 :     parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
     501 GIC           9 : }
     502                 : 
     503 ECB             : /*
     504                 :  * Compute the number of parallel worker processes to request.  Both index
     505                 :  * vacuum and index cleanup can be executed with parallel workers.
     506                 :  * The index is eligible for parallel vacuum iff its size is greater than
     507                 :  * min_parallel_index_scan_size as invoking workers for very small indexes
     508                 :  * can hurt performance.
     509                 :  *
     510                 :  * nrequested is the number of parallel workers that user requested.  If
     511                 :  * nrequested is 0, we compute the parallel degree based on nindexes, that is
     512                 :  * the number of indexes that support parallel vacuum.  This function also
     513                 :  * sets will_parallel_vacuum to remember indexes that participate in parallel
     514                 :  * vacuum.
     515                 :  */
     516                 : static int
     517 GIC       15832 : parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
     518                 :                                 bool *will_parallel_vacuum)
     519                 : {
     520           15832 :     int         nindexes_parallel = 0;
     521           15832 :     int         nindexes_parallel_bulkdel = 0;
     522           15832 :     int         nindexes_parallel_cleanup = 0;
     523                 :     int         parallel_workers;
     524                 : 
     525                 :     /*
     526                 :      * We don't allow performing parallel operation in standalone backend or
     527                 :      * when parallelism is disabled.
     528                 :      */
     529           15832 :     if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
     530 CBC       14847 :         return 0;
     531                 : 
     532                 :     /*
     533 ECB             :      * Compute the number of indexes that can participate in parallel vacuum.
     534                 :      */
     535 CBC        3231 :     for (int i = 0; i < nindexes; i++)
     536                 :     {
     537 GIC        2246 :         Relation    indrel = indrels[i];
     538            2246 :         uint8       vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     539                 : 
     540                 :         /* Skip index that is not a suitable target for parallel index vacuum */
     541            2246 :         if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
     542 CBC        2246 :             RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
     543            2200 :             continue;
     544                 : 
     545 GIC          46 :         will_parallel_vacuum[i] = true;
     546                 : 
     547              46 :         if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
     548 CBC          40 :             nindexes_parallel_bulkdel++;
     549 GIC          46 :         if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
     550 CBC          34 :             ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     551              40 :             nindexes_parallel_cleanup++;
     552                 :     }
     553                 : 
     554             985 :     nindexes_parallel = Max(nindexes_parallel_bulkdel,
     555 ECB             :                             nindexes_parallel_cleanup);
     556                 : 
     557                 :     /* The leader process takes one index */
     558 CBC         985 :     nindexes_parallel--;
     559                 : 
     560 ECB             :     /* No index supports parallel vacuum */
     561 CBC         985 :     if (nindexes_parallel <= 0)
     562             976 :         return 0;
     563 ECB             : 
     564                 :     /* Compute the parallel degree */
     565 GIC           9 :     parallel_workers = (nrequested > 0) ?
     566               9 :         Min(nrequested, nindexes_parallel) : nindexes_parallel;
     567 ECB             : 
     568                 :     /* Cap by max_parallel_maintenance_workers */
     569 GIC           9 :     parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
     570                 : 
     571 CBC           9 :     return parallel_workers;
     572                 : }
     573                 : 
     574 ECB             : /*
     575                 :  * Perform index vacuum or index cleanup with parallel workers.  This function
     576                 :  * must be used by the parallel vacuum leader process.
     577                 :  */
     578                 : static void
     579 CBC          14 : parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
     580                 :                                     bool vacuum)
     581                 : {
     582 ECB             :     int         nworkers;
     583                 :     PVIndVacStatus new_status;
     584                 : 
     585 GIC          14 :     Assert(!IsParallelWorker());
     586                 : 
     587              14 :     if (vacuum)
     588                 :     {
     589               5 :         new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
     590                 : 
     591                 :         /* Determine the number of parallel workers to launch */
     592 CBC           5 :         nworkers = pvs->nindexes_parallel_bulkdel;
     593                 :     }
     594                 :     else
     595                 :     {
     596 GIC           9 :         new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
     597                 : 
     598 ECB             :         /* Determine the number of parallel workers to launch */
     599 GIC           9 :         nworkers = pvs->nindexes_parallel_cleanup;
     600 ECB             : 
     601                 :         /* Add conditionally parallel-aware indexes if in the first time call */
     602 CBC           9 :         if (num_index_scans == 0)
     603 GIC           4 :             nworkers += pvs->nindexes_parallel_condcleanup;
     604                 :     }
     605 ECB             : 
     606                 :     /* The leader process will participate */
     607 GIC          14 :     nworkers--;
     608                 : 
     609 ECB             :     /*
     610                 :      * It is possible that parallel context is initialized with fewer workers
     611                 :      * than the number of indexes that need a separate worker in the current
     612                 :      * phase, so we need to consider it.  See
     613                 :      * parallel_vacuum_compute_workers().
     614                 :      */
     615 CBC          14 :     nworkers = Min(nworkers, pvs->pcxt->nworkers);
     616 ECB             : 
     617                 :     /*
     618                 :      * Set index vacuum status and mark whether parallel vacuum worker can
     619                 :      * process it.
     620                 :      */
     621 GIC          80 :     for (int i = 0; i < pvs->nindexes; i++)
     622                 :     {
     623              66 :         PVIndStats *indstats = &(pvs->indstats[i]);
     624                 : 
     625              66 :         Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
     626              66 :         indstats->status = new_status;
     627              66 :         indstats->parallel_workers_can_process =
     628 CBC         126 :             (pvs->will_parallel_vacuum[i] &&
     629 GIC          60 :              parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
     630                 :                                                     num_index_scans,
     631                 :                                                     vacuum));
     632                 :     }
     633                 : 
     634 ECB             :     /* Reset the parallel index processing counter */
     635 GIC          14 :     pg_atomic_write_u32(&(pvs->shared->idx), 0);
     636 ECB             : 
     637                 :     /* Setup the shared cost-based vacuum delay and launch workers */
     638 CBC          14 :     if (nworkers > 0)
     639 ECB             :     {
     640                 :         /* Reinitialize parallel context to relaunch parallel workers */
     641 CBC          11 :         if (num_index_scans > 0)
     642               2 :             ReinitializeParallelDSM(pvs->pcxt);
     643                 : 
     644                 :         /*
     645                 :          * Set up shared cost balance and the number of active workers for
     646                 :          * vacuum delay.  We need to do this before launching workers as
     647                 :          * otherwise, they might not see the updated values for these
     648 ECB             :          * parameters.
     649                 :          */
     650 GIC          11 :         pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
     651 CBC          11 :         pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
     652                 : 
     653                 :         /*
     654 ECB             :          * The number of workers can vary between bulkdelete and cleanup
     655                 :          * phase.
     656                 :          */
     657 GIC          11 :         ReinitializeParallelWorkers(pvs->pcxt, nworkers);
     658                 : 
     659              11 :         LaunchParallelWorkers(pvs->pcxt);
     660                 : 
     661              11 :         if (pvs->pcxt->nworkers_launched > 0)
     662                 :         {
     663 ECB             :             /*
     664                 :              * Reset the local cost values for leader backend as we have
     665                 :              * already accumulated the remaining balance of heap.
     666                 :              */
     667 GIC          11 :             VacuumCostBalance = 0;
     668              11 :             VacuumCostBalanceLocal = 0;
     669                 : 
     670 ECB             :             /* Enable shared cost balance for leader backend */
     671 GIC          11 :             VacuumSharedCostBalance = &(pvs->shared->cost_balance);
     672 CBC          11 :             VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
     673                 :         }
     674 ECB             : 
     675 GIC          11 :         if (vacuum)
     676               5 :             ereport(pvs->shared->elevel,
     677                 :                     (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
     678                 :                                      "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
     679                 :                                      pvs->pcxt->nworkers_launched),
     680 ECB             :                             pvs->pcxt->nworkers_launched, nworkers)));
     681                 :         else
     682 GIC           6 :             ereport(pvs->shared->elevel,
     683                 :                     (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
     684 ECB             :                                      "launched %d parallel vacuum workers for index cleanup (planned: %d)",
     685                 :                                      pvs->pcxt->nworkers_launched),
     686                 :                             pvs->pcxt->nworkers_launched, nworkers)));
     687                 :     }
     688                 : 
     689                 :     /* Vacuum the indexes that can be processed by only leader process */
     690 GIC          14 :     parallel_vacuum_process_unsafe_indexes(pvs);
     691                 : 
     692                 :     /*
     693                 :      * Join as a parallel worker.  The leader vacuums alone processes all
     694                 :      * parallel-safe indexes in the case where no workers are launched.
     695 ECB             :      */
     696 GIC          14 :     parallel_vacuum_process_safe_indexes(pvs);
     697                 : 
     698                 :     /*
     699                 :      * Next, accumulate buffer and WAL usage.  (This must wait for the workers
     700                 :      * to finish, or we might get incomplete data.)
     701                 :      */
     702              14 :     if (nworkers > 0)
     703 ECB             :     {
     704                 :         /* Wait for all vacuum workers to finish */
     705 GIC          11 :         WaitForParallelWorkersToFinish(pvs->pcxt);
     706                 : 
     707              28 :         for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
     708              17 :             InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
     709 ECB             :     }
     710                 : 
     711                 :     /*
     712                 :      * Reset all index status back to initial (while checking that we have
     713                 :      * vacuumed all indexes).
     714                 :      */
     715 CBC          80 :     for (int i = 0; i < pvs->nindexes; i++)
     716                 :     {
     717 GIC          66 :         PVIndStats *indstats = &(pvs->indstats[i]);
     718 ECB             : 
     719 GIC          66 :         if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
     720 LBC           0 :             elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
     721 ECB             :                  RelationGetRelationName(pvs->indrels[i]));
     722                 : 
     723 GIC          66 :         indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
     724                 :     }
     725                 : 
     726                 :     /*
     727                 :      * Carry the shared balance value to heap scan and disable shared costing
     728 ECB             :      */
     729 GIC          14 :     if (VacuumSharedCostBalance)
     730 ECB             :     {
     731 GIC          11 :         VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
     732 CBC          11 :         VacuumSharedCostBalance = NULL;
     733 GBC          11 :         VacuumActiveNWorkers = NULL;
     734                 :     }
     735 GIC          14 : }
     736 ECB             : 
     737                 : /*
     738                 :  * Index vacuum/cleanup routine used by the leader process and parallel
     739                 :  * vacuum worker processes to vacuum the indexes in parallel.
     740                 :  */
     741                 : static void
     742 CBC          31 : parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
     743                 : {
     744 ECB             :     /*
     745                 :      * Increment the active worker count if we are able to launch any worker.
     746                 :      */
     747 GIC          31 :     if (VacuumActiveNWorkers)
     748 CBC          28 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     749                 : 
     750                 :     /* Loop until all indexes are vacuumed */
     751                 :     for (;;)
     752 GIC          66 :     {
     753                 :         int         idx;
     754                 :         PVIndStats *indstats;
     755 ECB             : 
     756                 :         /* Get an index number to process */
     757 GIC          97 :         idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
     758                 : 
     759                 :         /* Done for all indexes? */
     760 CBC          97 :         if (idx >= pvs->nindexes)
     761              31 :             break;
     762                 : 
     763 GIC          66 :         indstats = &(pvs->indstats[idx]);
     764                 : 
     765 ECB             :         /*
     766                 :          * Skip vacuuming index that is unsafe for workers or has an
     767                 :          * unsuitable target for parallel index vacuum (this is vacuumed in
     768                 :          * parallel_vacuum_process_unsafe_indexes() by the leader).
     769                 :          */
     770 CBC          66 :         if (!indstats->parallel_workers_can_process)
     771 GIC          26 :             continue;
     772                 : 
     773 ECB             :         /* Do vacuum or cleanup of the index */
     774 CBC          40 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
     775                 :     }
     776 ECB             : 
     777                 :     /*
     778                 :      * We have completed the index vacuum so decrement the active worker
     779                 :      * count.
     780                 :      */
     781 GIC          31 :     if (VacuumActiveNWorkers)
     782              28 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     783 CBC          31 : }
     784 ECB             : 
     785                 : /*
     786                 :  * Perform parallel vacuuming of indexes in leader process.
     787                 :  *
     788                 :  * Handles index vacuuming (or index cleanup) for indexes that are not
     789                 :  * parallel safe.  It's possible that this will vary for a given index, based
     790                 :  * on details like whether we're performing index cleanup right now.
     791                 :  *
     792                 :  * Also performs vacuuming of smaller indexes that fell under the size cutoff
     793                 :  * enforced by parallel_vacuum_compute_workers().
     794                 :  */
     795                 : static void
     796 CBC          14 : parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
     797                 : {
     798 GIC          14 :     Assert(!IsParallelWorker());
     799                 : 
     800                 :     /*
     801                 :      * Increment the active worker count if we are able to launch any worker.
     802                 :      */
     803              14 :     if (VacuumActiveNWorkers)
     804              11 :         pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
     805                 : 
     806              80 :     for (int i = 0; i < pvs->nindexes; i++)
     807                 :     {
     808              66 :         PVIndStats *indstats = &(pvs->indstats[i]);
     809 ECB             : 
     810                 :         /* Skip, indexes that are safe for workers */
     811 CBC          66 :         if (indstats->parallel_workers_can_process)
     812 GIC          40 :             continue;
     813                 : 
     814                 :         /* Do vacuum or cleanup of the index */
     815              26 :         parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
     816 ECB             :     }
     817                 : 
     818                 :     /*
     819                 :      * We have completed the index vacuum so decrement the active worker
     820                 :      * count.
     821                 :      */
     822 GIC          14 :     if (VacuumActiveNWorkers)
     823              11 :         pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
     824 CBC          14 : }
     825 ECB             : 
     826                 : /*
     827                 :  * Vacuum or cleanup index either by leader process or by one of the worker
     828                 :  * process.  After vacuuming the index this function copies the index
     829                 :  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
     830                 :  * segment.
     831                 :  */
     832                 : static void
     833 GIC          66 : parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
     834                 :                                   PVIndStats *indstats)
     835 ECB             : {
     836 CBC          66 :     IndexBulkDeleteResult *istat = NULL;
     837 ECB             :     IndexBulkDeleteResult *istat_res;
     838                 :     IndexVacuumInfo ivinfo;
     839                 : 
     840                 :     /*
     841                 :      * Update the pointer to the corresponding bulk-deletion result if someone
     842                 :      * has already updated it
     843                 :      */
     844 GIC          66 :     if (indstats->istat_updated)
     845              21 :         istat = &(indstats->istat);
     846 ECB             : 
     847 GIC          66 :     ivinfo.index = indrel;
     848 GNC          66 :     ivinfo.heaprel = pvs->heaprel;
     849 GIC          66 :     ivinfo.analyze_only = false;
     850 CBC          66 :     ivinfo.report_progress = false;
     851 GIC          66 :     ivinfo.message_level = DEBUG2;
     852              66 :     ivinfo.estimated_count = pvs->shared->estimated_count;
     853              66 :     ivinfo.num_heap_tuples = pvs->shared->reltuples;
     854              66 :     ivinfo.strategy = pvs->bstrategy;
     855                 : 
     856                 :     /* Update error traceback information */
     857              66 :     pvs->indname = pstrdup(RelationGetRelationName(indrel));
     858 CBC          66 :     pvs->status = indstats->status;
     859 ECB             : 
     860 GIC          66 :     switch (indstats->status)
     861 ECB             :     {
     862 CBC          21 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
     863              21 :             istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
     864              21 :             break;
     865              45 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
     866              45 :             istat_res = vac_cleanup_one_index(&ivinfo, istat);
     867              45 :             break;
     868 LBC           0 :         default:
     869 UIC           0 :             elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
     870                 :                  indstats->status,
     871 ECB             :                  RelationGetRelationName(indrel));
     872                 :     }
     873                 : 
     874                 :     /*
     875                 :      * Copy the index bulk-deletion result returned from ambulkdelete and
     876                 :      * amvacuumcleanup to the DSM segment if it's the first cycle because they
     877                 :      * allocate locally and it's possible that an index will be vacuumed by a
     878                 :      * different vacuum process the next cycle.  Copying the result normally
     879                 :      * happens only the first time an index is vacuumed.  For any additional
     880                 :      * vacuum pass, we directly point to the result on the DSM segment and
     881                 :      * pass it to vacuum index APIs so that workers can update it directly.
     882 EUB             :      *
     883                 :      * Since all vacuum workers write the bulk-deletion result at different
     884                 :      * slots we can write them without locking.
     885                 :      */
     886 GIC          66 :     if (!indstats->istat_updated && istat_res != NULL)
     887                 :     {
     888              37 :         memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
     889              37 :         indstats->istat_updated = true;
     890                 : 
     891                 :         /* Free the locally-allocated bulk-deletion result */
     892              37 :         pfree(istat_res);
     893                 :     }
     894                 : 
     895                 :     /*
     896                 :      * Update the status to completed. No need to lock here since each worker
     897                 :      * touches different indexes.
     898                 :      */
     899              66 :     indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     900 ECB             : 
     901                 :     /* Reset error traceback information */
     902 CBC          66 :     pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
     903              66 :     pfree(pvs->indname);
     904 GIC          66 :     pvs->indname = NULL;
     905              66 : }
     906 ECB             : 
     907                 : /*
     908                 :  * Returns false, if the given index can't participate in the next execution of
     909                 :  * parallel index vacuum or parallel index cleanup.
     910                 :  */
     911                 : static bool
     912 GIC          60 : parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
     913 ECB             :                                        bool vacuum)
     914                 : {
     915                 :     uint8       vacoptions;
     916                 : 
     917 CBC          60 :     vacoptions = indrel->rd_indam->amparallelvacuumoptions;
     918 ECB             : 
     919                 :     /* In parallel vacuum case, check if it supports parallel bulk-deletion */
     920 GIC          60 :     if (vacuum)
     921              18 :         return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
     922                 : 
     923                 :     /* Not safe, if the index does not support parallel cleanup */
     924              42 :     if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
     925              30 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
     926 CBC           6 :         return false;
     927                 : 
     928                 :     /*
     929                 :      * Not safe, if the index supports parallel cleanup conditionally, but we
     930                 :      * have already processed the index (for bulkdelete).  We do this to avoid
     931 ECB             :      * the need to invoke workers when parallel index cleanup doesn't need to
     932                 :      * scan the index.  See the comments for option
     933                 :      * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
     934                 :      * parallel cleanup conditionally.
     935                 :      */
     936 GIC          36 :     if (num_index_scans > 0 &&
     937              16 :         ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
     938 CBC          12 :         return false;
     939 ECB             : 
     940 CBC          24 :     return true;
     941                 : }
     942                 : 
     943                 : /*
     944                 :  * Perform work within a launched parallel process.
     945                 :  *
     946                 :  * Since parallel vacuum workers perform only index vacuum or index cleanup,
     947                 :  * we don't need to report progress information.
     948                 :  */
     949                 : void
     950              17 : parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
     951 ECB             : {
     952                 :     ParallelVacuumState pvs;
     953                 :     Relation    rel;
     954                 :     Relation   *indrels;
     955                 :     PVIndStats *indstats;
     956                 :     PVShared   *shared;
     957                 :     VacDeadItems *dead_items;
     958                 :     BufferUsage *buffer_usage;
     959                 :     WalUsage   *wal_usage;
     960                 :     int         nindexes;
     961                 :     char       *sharedquery;
     962                 :     ErrorContextCallback errcallback;
     963                 : 
     964                 :     /*
     965                 :      * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
     966                 :      * don't support parallel vacuum for autovacuum as of now.
     967                 :      */
     968 GIC          17 :     Assert(MyProc->statusFlags == PROC_IN_VACUUM);
     969                 : 
     970              17 :     elog(DEBUG1, "starting parallel vacuum worker");
     971                 : 
     972              17 :     shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
     973                 : 
     974                 :     /* Set debug_query_string for individual workers */
     975              17 :     sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
     976              17 :     debug_query_string = sharedquery;
     977              17 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
     978                 : 
     979                 :     /*
     980                 :      * Open table.  The lock mode is the same as the leader process.  It's
     981                 :      * okay because the lock mode does not conflict among the parallel
     982 ECB             :      * workers.
     983                 :      */
     984 CBC          17 :     rel = table_open(shared->relid, ShareUpdateExclusiveLock);
     985                 : 
     986 ECB             :     /*
     987                 :      * Open all indexes. indrels are sorted in order by OID, which should be
     988                 :      * matched to the leader's one.
     989                 :      */
     990 CBC          17 :     vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
     991              17 :     Assert(nindexes > 0);
     992                 : 
     993 GIC          17 :     if (shared->maintenance_work_mem_worker > 0)
     994              17 :         maintenance_work_mem = shared->maintenance_work_mem_worker;
     995                 : 
     996                 :     /* Set index statistics */
     997              17 :     indstats = (PVIndStats *) shm_toc_lookup(toc,
     998 ECB             :                                              PARALLEL_VACUUM_KEY_INDEX_STATS,
     999                 :                                              false);
    1000                 : 
    1001                 :     /* Set dead_items space */
    1002 GIC          17 :     dead_items = (VacDeadItems *) shm_toc_lookup(toc,
    1003                 :                                                  PARALLEL_VACUUM_KEY_DEAD_ITEMS,
    1004 ECB             :                                                  false);
    1005                 : 
    1006                 :     /* Set cost-based vacuum delay */
    1007 GNC          17 :     VacuumUpdateCosts();
    1008 CBC          17 :     VacuumCostBalance = 0;
    1009 GIC          17 :     VacuumPageHit = 0;
    1010              17 :     VacuumPageMiss = 0;
    1011 CBC          17 :     VacuumPageDirty = 0;
    1012 GIC          17 :     VacuumCostBalanceLocal = 0;
    1013              17 :     VacuumSharedCostBalance = &(shared->cost_balance);
    1014              17 :     VacuumActiveNWorkers = &(shared->active_nworkers);
    1015                 : 
    1016 ECB             :     /* Set parallel vacuum state */
    1017 GIC          17 :     pvs.indrels = indrels;
    1018              17 :     pvs.nindexes = nindexes;
    1019              17 :     pvs.indstats = indstats;
    1020              17 :     pvs.shared = shared;
    1021 CBC          17 :     pvs.dead_items = dead_items;
    1022              17 :     pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
    1023              17 :     pvs.relname = pstrdup(RelationGetRelationName(rel));
    1024 GNC          17 :     pvs.heaprel = rel;
    1025 ECB             : 
    1026                 :     /* These fields will be filled during index vacuum or cleanup */
    1027 CBC          17 :     pvs.indname = NULL;
    1028              17 :     pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
    1029 ECB             : 
    1030                 :     /* Each parallel VACUUM worker gets its own access strategy. */
    1031 GNC          34 :     pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
    1032              17 :                                               shared->ring_nbuffers * (BLCKSZ / 1024));
    1033 ECB             : 
    1034                 :     /* Setup error traceback support for ereport() */
    1035 CBC          17 :     errcallback.callback = parallel_vacuum_error_callback;
    1036              17 :     errcallback.arg = &pvs;
    1037              17 :     errcallback.previous = error_context_stack;
    1038              17 :     error_context_stack = &errcallback;
    1039 ECB             : 
    1040                 :     /* Prepare to track buffer usage during parallel execution */
    1041 GIC          17 :     InstrStartParallelQuery();
    1042                 : 
    1043 ECB             :     /* Process indexes to perform vacuum/cleanup */
    1044 CBC          17 :     parallel_vacuum_process_safe_indexes(&pvs);
    1045                 : 
    1046                 :     /* Report buffer/WAL usage during parallel execution */
    1047              17 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
    1048              17 :     wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
    1049 GIC          17 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1050              17 :                           &wal_usage[ParallelWorkerNumber]);
    1051 ECB             : 
    1052                 :     /* Pop the error context stack */
    1053 CBC          17 :     error_context_stack = errcallback.previous;
    1054 ECB             : 
    1055 GIC          17 :     vac_close_indexes(nindexes, indrels, RowExclusiveLock);
    1056              17 :     table_close(rel, ShareUpdateExclusiveLock);
    1057 CBC          17 :     FreeAccessStrategy(pvs.bstrategy);
    1058 GIC          17 : }
    1059                 : 
    1060 ECB             : /*
    1061                 :  * Error context callback for errors occurring during parallel index vacuum.
    1062                 :  * The error context messages should match the messages set in the lazy vacuum
    1063                 :  * error context.  If you change this function, change vacuum_error_callback()
    1064                 :  * as well.
    1065                 :  */
    1066                 : static void
    1067 UIC           0 : parallel_vacuum_error_callback(void *arg)
    1068                 : {
    1069 LBC           0 :     ParallelVacuumState *errinfo = arg;
    1070                 : 
    1071               0 :     switch (errinfo->status)
    1072 ECB             :     {
    1073 LBC           0 :         case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
    1074               0 :             errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
    1075                 :                        errinfo->indname,
    1076                 :                        errinfo->relnamespace,
    1077                 :                        errinfo->relname);
    1078 UIC           0 :             break;
    1079               0 :         case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
    1080               0 :             errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
    1081                 :                        errinfo->indname,
    1082                 :                        errinfo->relnamespace,
    1083 EUB             :                        errinfo->relname);
    1084 UIC           0 :             break;
    1085 UBC           0 :         case PARALLEL_INDVAC_STATUS_INITIAL:
    1086                 :         case PARALLEL_INDVAC_STATUS_COMPLETED:
    1087 EUB             :         default:
    1088 UIC           0 :             return;
    1089 EUB             :     }
    1090                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a