LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 90.3 % 487 440 7 38 2 10 266 12 152 35 280 1
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 19 19 19 19
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * parallel.c
       4                 :  *    Infrastructure for launching parallel workers
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *    src/backend/access/transam/parallel.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "access/nbtree.h"
      18                 : #include "access/parallel.h"
      19                 : #include "access/session.h"
      20                 : #include "access/xact.h"
      21                 : #include "access/xlog.h"
      22                 : #include "catalog/index.h"
      23                 : #include "catalog/namespace.h"
      24                 : #include "catalog/pg_enum.h"
      25                 : #include "catalog/storage.h"
      26                 : #include "commands/async.h"
      27                 : #include "commands/vacuum.h"
      28                 : #include "executor/execParallel.h"
      29                 : #include "libpq/libpq.h"
      30                 : #include "libpq/pqformat.h"
      31                 : #include "libpq/pqmq.h"
      32                 : #include "miscadmin.h"
      33                 : #include "optimizer/optimizer.h"
      34                 : #include "pgstat.h"
      35                 : #include "storage/ipc.h"
      36                 : #include "storage/predicate.h"
      37                 : #include "storage/sinval.h"
      38                 : #include "storage/spin.h"
      39                 : #include "tcop/tcopprot.h"
      40                 : #include "utils/combocid.h"
      41                 : #include "utils/guc.h"
      42                 : #include "utils/inval.h"
      43                 : #include "utils/memutils.h"
      44                 : #include "utils/relmapper.h"
      45                 : #include "utils/snapmgr.h"
      46                 : #include "utils/typcache.h"
      47                 : 
      48                 : /*
      49                 :  * We don't want to waste a lot of memory on an error queue which, most of
      50                 :  * the time, will process only a handful of small messages.  However, it is
      51                 :  * desirable to make it large enough that a typical ErrorResponse can be sent
      52                 :  * without blocking.  That way, a worker that errors out can write the whole
      53                 :  * message into the queue and terminate without waiting for the user backend.
      54                 :  */
      55                 : #define PARALLEL_ERROR_QUEUE_SIZE           16384
      56                 : 
      57                 : /* Magic number for parallel context TOC. */
      58                 : #define PARALLEL_MAGIC                      0x50477c7c
      59                 : 
      60                 : /*
      61                 :  * Magic numbers for per-context parallel state sharing.  Higher-level code
      62                 :  * should use smaller values, leaving these very large ones for use by this
      63                 :  * module.
      64                 :  */
      65                 : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
      66                 : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
      67                 : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
      68                 : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
      69                 : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
      70                 : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
      71                 : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
      72                 : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
      73                 : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
      74                 : #define PARALLEL_KEY_SESSION_DSM            UINT64CONST(0xFFFFFFFFFFFF000A)
      75                 : #define PARALLEL_KEY_PENDING_SYNCS          UINT64CONST(0xFFFFFFFFFFFF000B)
      76                 : #define PARALLEL_KEY_REINDEX_STATE          UINT64CONST(0xFFFFFFFFFFFF000C)
      77                 : #define PARALLEL_KEY_RELMAPPER_STATE        UINT64CONST(0xFFFFFFFFFFFF000D)
      78                 : #define PARALLEL_KEY_UNCOMMITTEDENUMS       UINT64CONST(0xFFFFFFFFFFFF000E)
      79                 : #define PARALLEL_KEY_CLIENTCONNINFO         UINT64CONST(0xFFFFFFFFFFFF000F)
      80                 : 
      81                 : /* Fixed-size parallel state. */
      82                 : typedef struct FixedParallelState
      83                 : {
      84                 :     /* Fixed-size state that workers must restore. */
      85                 :     Oid         database_id;
      86                 :     Oid         authenticated_user_id;
      87                 :     Oid         current_user_id;
      88                 :     Oid         outer_user_id;
      89                 :     Oid         temp_namespace_id;
      90                 :     Oid         temp_toast_namespace_id;
      91                 :     int         sec_context;
      92                 :     bool        is_superuser;
      93                 :     PGPROC     *parallel_leader_pgproc;
      94                 :     pid_t       parallel_leader_pid;
      95                 :     BackendId   parallel_leader_backend_id;
      96                 :     TimestampTz xact_ts;
      97                 :     TimestampTz stmt_ts;
      98                 :     SerializableXactHandle serializable_xact_handle;
      99                 : 
     100                 :     /* Mutex protects remaining fields. */
     101                 :     slock_t     mutex;
     102                 : 
     103                 :     /* Maximum XactLastRecEnd of any worker. */
     104                 :     XLogRecPtr  last_xlog_end;
     105                 : } FixedParallelState;
     106                 : 
     107                 : /*
     108                 :  * Our parallel worker number.  We initialize this to -1, meaning that we are
     109                 :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
     110                 :  * and < the number of workers before any user code is invoked; each parallel
     111                 :  * worker will get a different parallel worker number.
     112                 :  */
     113                 : int         ParallelWorkerNumber = -1;
     114                 : 
     115                 : /* Is there a parallel message pending which we need to receive? */
     116                 : volatile sig_atomic_t ParallelMessagePending = false;
     117                 : 
     118                 : /* Are we initializing a parallel worker? */
     119                 : bool        InitializingParallelWorker = false;
     120                 : 
     121                 : /* Pointer to our fixed parallel state. */
     122                 : static FixedParallelState *MyFixedParallelState;
     123                 : 
     124                 : /* List of active parallel contexts. */
     125                 : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
     126                 : 
     127                 : /* Backend-local copy of data from FixedParallelState. */
     128                 : static pid_t ParallelLeaderPid;
     129                 : 
     130                 : /*
     131                 :  * List of internal parallel worker entry points.  We need this for
     132                 :  * reasons explained in LookupParallelWorkerFunction(), below.
     133                 :  */
     134                 : static const struct
     135                 : {
     136                 :     const char *fn_name;
     137                 :     parallel_worker_main_type fn_addr;
     138                 : }           InternalParallelWorkers[] =
     139                 : 
     140                 : {
     141                 :     {
     142                 :         "ParallelQueryMain", ParallelQueryMain
     143                 :     },
     144                 :     {
     145                 :         "_bt_parallel_build_main", _bt_parallel_build_main
     146                 :     },
     147                 :     {
     148                 :         "parallel_vacuum_main", parallel_vacuum_main
     149                 :     }
     150                 : };
     151                 : 
     152                 : /* Private functions. */
     153                 : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
     154                 : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
     155                 : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
     156                 : static void ParallelWorkerShutdown(int code, Datum arg);
     157                 : 
     158                 : 
     159                 : /*
     160                 :  * Establish a new parallel context.  This should be done after entering
     161                 :  * parallel mode, and (unless there is an error) the context should be
     162                 :  * destroyed before exiting the current subtransaction.
     163                 :  */
     164                 : ParallelContext *
     165 GIC         403 : CreateParallelContext(const char *library_name, const char *function_name,
     166 ECB             :                       int nworkers)
     167                 : {
     168                 :     MemoryContext oldcontext;
     169                 :     ParallelContext *pcxt;
     170                 : 
     171                 :     /* It is unsafe to create a parallel context if not in parallel mode. */
     172 GIC         403 :     Assert(IsInParallelMode());
     173 ECB             : 
     174                 :     /* Number of workers should be non-negative. */
     175 GIC         403 :     Assert(nworkers >= 0);
     176 ECB             : 
     177                 :     /* We might be running in a short-lived memory context. */
     178 GIC         403 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     179 ECB             : 
     180                 :     /* Initialize a new ParallelContext. */
     181 GIC         403 :     pcxt = palloc0(sizeof(ParallelContext));
     182 CBC         403 :     pcxt->subid = GetCurrentSubTransactionId();
     183             403 :     pcxt->nworkers = nworkers;
     184             403 :     pcxt->nworkers_to_launch = nworkers;
     185             403 :     pcxt->library_name = pstrdup(library_name);
     186             403 :     pcxt->function_name = pstrdup(function_name);
     187             403 :     pcxt->error_context_stack = error_context_stack;
     188             403 :     shm_toc_initialize_estimator(&pcxt->estimator);
     189             403 :     dlist_push_head(&pcxt_list, &pcxt->node);
     190 ECB             : 
     191                 :     /* Restore previous memory context. */
     192 GIC         403 :     MemoryContextSwitchTo(oldcontext);
     193 ECB             : 
     194 GIC         403 :     return pcxt;
     195 ECB             : }
     196                 : 
     197                 : /*
     198                 :  * Establish the dynamic shared memory segment for a parallel context and
     199                 :  * copy state and other bookkeeping information that will be needed by
     200                 :  * parallel workers into it.
     201                 :  */
     202                 : void
     203 GIC         403 : InitializeParallelDSM(ParallelContext *pcxt)
     204 ECB             : {
     205                 :     MemoryContext oldcontext;
     206 GIC         403 :     Size        library_len = 0;
     207 CBC         403 :     Size        guc_len = 0;
     208             403 :     Size        combocidlen = 0;
     209             403 :     Size        tsnaplen = 0;
     210             403 :     Size        asnaplen = 0;
     211             403 :     Size        tstatelen = 0;
     212             403 :     Size        pendingsyncslen = 0;
     213             403 :     Size        reindexlen = 0;
     214             403 :     Size        relmapperlen = 0;
     215             403 :     Size        uncommittedenumslen = 0;
     216 GNC         403 :     Size        clientconninfolen = 0;
     217 CBC         403 :     Size        segsize = 0;
     218 ECB             :     int         i;
     219                 :     FixedParallelState *fps;
     220 GIC         403 :     dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
     221             403 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
     222 CBC         403 :     Snapshot    active_snapshot = GetActiveSnapshot();
     223 ECB             : 
     224                 :     /* We might be running in a very short-lived memory context. */
     225 GIC         403 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     226                 : 
     227 ECB             :     /* Allow space to store the fixed-size parallel state. */
     228 GIC         403 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
     229             403 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     230 ECB             : 
     231                 :     /*
     232                 :      * Normally, the user will have requested at least one worker process, but
     233                 :      * if by chance they have not, we can skip a bunch of things here.
     234                 :      */
     235 GIC         403 :     if (pcxt->nworkers > 0)
     236                 :     {
     237 ECB             :         /* Get (or create) the per-session DSM segment's handle. */
     238 GIC         403 :         session_dsm_handle = GetSessionDsmHandle();
     239                 : 
     240 ECB             :         /*
     241                 :          * If we weren't able to create a per-session DSM segment, then we can
     242                 :          * continue but we can't safely launch any workers because their
     243                 :          * record typmods would be incompatible so they couldn't exchange
     244                 :          * tuples.
     245                 :          */
     246 GIC         403 :         if (session_dsm_handle == DSM_HANDLE_INVALID)
     247 UIC           0 :             pcxt->nworkers = 0;
     248 ECB             :     }
     249 EUB             : 
     250 GIC         403 :     if (pcxt->nworkers > 0)
     251                 :     {
     252 ECB             :         /* Estimate space for various kinds of state sharing. */
     253 GIC         403 :         library_len = EstimateLibraryStateSpace();
     254             403 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
     255 CBC         403 :         guc_len = EstimateGUCStateSpace();
     256             403 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
     257             403 :         combocidlen = EstimateComboCIDStateSpace();
     258             403 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
     259             403 :         if (IsolationUsesXactSnapshot())
     260 ECB             :         {
     261 CBC          11 :             tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
     262 GIC          11 :             shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
     263 ECB             :         }
     264 CBC         403 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
     265 GIC         403 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
     266 CBC         403 :         tstatelen = EstimateTransactionStateSpace();
     267             403 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
     268             403 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
     269             403 :         pendingsyncslen = EstimatePendingSyncsSpace();
     270             403 :         shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
     271             403 :         reindexlen = EstimateReindexStateSpace();
     272             403 :         shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
     273             403 :         relmapperlen = EstimateRelationMapSpace();
     274             403 :         shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
     275             403 :         uncommittedenumslen = EstimateUncommittedEnumsSpace();
     276             403 :         shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
     277 GNC         403 :         clientconninfolen = EstimateClientConnectionInfoSpace();
     278             403 :         shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
     279 ECB             :         /* If you add more chunks here, you probably need to add keys. */
     280 GNC         403 :         shm_toc_estimate_keys(&pcxt->estimator, 12);
     281 ECB             : 
     282                 :         /* Estimate space need for error queues. */
     283                 :         StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
     284                 :                          PARALLEL_ERROR_QUEUE_SIZE,
     285                 :                          "parallel error queue size not buffer-aligned");
     286 GIC         403 :         shm_toc_estimate_chunk(&pcxt->estimator,
     287                 :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     288                 :                                         pcxt->nworkers));
     289             403 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     290 ECB             : 
     291                 :         /* Estimate how much we'll need for the entrypoint info. */
     292 GIC         403 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
     293 ECB             :                                strlen(pcxt->function_name) + 2);
     294 GIC         403 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     295                 :     }
     296 ECB             : 
     297                 :     /*
     298                 :      * Create DSM and initialize with new table of contents.  But if the user
     299                 :      * didn't request any workers, then don't bother creating a dynamic shared
     300                 :      * memory segment; instead, just use backend-private memory.
     301                 :      *
     302                 :      * Also, if we can't create a dynamic shared memory segment because the
     303                 :      * maximum number of segments have already been created, then fall back to
     304                 :      * backend-private memory, and plan not to use any workers.  We hope this
     305                 :      * won't happen very often, but it's better to abandon the use of
     306                 :      * parallelism than to fail outright.
     307                 :      */
     308 GIC         403 :     segsize = shm_toc_estimate(&pcxt->estimator);
     309             403 :     if (pcxt->nworkers > 0)
     310             403 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
     311             403 :     if (pcxt->seg != NULL)
     312 CBC         403 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
     313 ECB             :                                    dsm_segment_address(pcxt->seg),
     314                 :                                    segsize);
     315                 :     else
     316                 :     {
     317 UIC           0 :         pcxt->nworkers = 0;
     318               0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
     319               0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
     320                 :                                    segsize);
     321 EUB             :     }
     322                 : 
     323                 :     /* Initialize fixed-size state in shared memory. */
     324                 :     fps = (FixedParallelState *)
     325 GIC         403 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
     326             403 :     fps->database_id = MyDatabaseId;
     327             403 :     fps->authenticated_user_id = GetAuthenticatedUserId();
     328             403 :     fps->outer_user_id = GetCurrentRoleId();
     329 CBC         403 :     fps->is_superuser = session_auth_is_superuser;
     330             403 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
     331             403 :     GetTempNamespaceState(&fps->temp_namespace_id,
     332 ECB             :                           &fps->temp_toast_namespace_id);
     333 CBC         403 :     fps->parallel_leader_pgproc = MyProc;
     334             403 :     fps->parallel_leader_pid = MyProcPid;
     335             403 :     fps->parallel_leader_backend_id = MyBackendId;
     336 GIC         403 :     fps->xact_ts = GetCurrentTransactionStartTimestamp();
     337 CBC         403 :     fps->stmt_ts = GetCurrentStatementStartTimestamp();
     338             403 :     fps->serializable_xact_handle = ShareSerializableXact();
     339             403 :     SpinLockInit(&fps->mutex);
     340             403 :     fps->last_xlog_end = 0;
     341             403 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
     342 ECB             : 
     343                 :     /* We can skip the rest of this if we're not budgeting for any workers. */
     344 CBC         403 :     if (pcxt->nworkers > 0)
     345 ECB             :     {
     346                 :         char       *libraryspace;
     347                 :         char       *gucspace;
     348                 :         char       *combocidspace;
     349                 :         char       *tsnapspace;
     350                 :         char       *asnapspace;
     351                 :         char       *tstatespace;
     352                 :         char       *pendingsyncsspace;
     353                 :         char       *reindexspace;
     354                 :         char       *relmapperspace;
     355                 :         char       *error_queue_space;
     356                 :         char       *session_dsm_handle_space;
     357                 :         char       *entrypointstate;
     358                 :         char       *uncommittedenumsspace;
     359                 :         char       *clientconninfospace;
     360                 :         Size        lnamelen;
     361                 : 
     362                 :         /* Serialize shared libraries we have loaded. */
     363 GIC         403 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
     364             403 :         SerializeLibraryState(library_len, libraryspace);
     365             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
     366                 : 
     367                 :         /* Serialize GUC settings. */
     368 CBC         403 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
     369             403 :         SerializeGUCState(guc_len, gucspace);
     370             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
     371                 : 
     372                 :         /* Serialize combo CID state. */
     373             403 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
     374             403 :         SerializeComboCIDState(combocidlen, combocidspace);
     375             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
     376                 : 
     377                 :         /*
     378 ECB             :          * Serialize the transaction snapshot if the transaction
     379                 :          * isolation-level uses a transaction snapshot.
     380                 :          */
     381 GIC         403 :         if (IsolationUsesXactSnapshot())
     382                 :         {
     383              11 :             tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
     384              11 :             SerializeSnapshot(transaction_snapshot, tsnapspace);
     385              11 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
     386 ECB             :                            tsnapspace);
     387                 :         }
     388                 : 
     389                 :         /* Serialize the active snapshot. */
     390 CBC         403 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
     391 GIC         403 :         SerializeSnapshot(active_snapshot, asnapspace);
     392             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
     393                 : 
     394                 :         /* Provide the handle for per-session segment. */
     395 CBC         403 :         session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
     396 ECB             :                                                     sizeof(dsm_handle));
     397 CBC         403 :         *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
     398 GIC         403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
     399                 :                        session_dsm_handle_space);
     400 ECB             : 
     401                 :         /* Serialize transaction state. */
     402 CBC         403 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
     403             403 :         SerializeTransactionState(tstatelen, tstatespace);
     404 GIC         403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
     405                 : 
     406                 :         /* Serialize pending syncs. */
     407 CBC         403 :         pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
     408             403 :         SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
     409             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
     410                 :                        pendingsyncsspace);
     411                 : 
     412 ECB             :         /* Serialize reindex state. */
     413 CBC         403 :         reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
     414             403 :         SerializeReindexState(reindexlen, reindexspace);
     415 GIC         403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
     416                 : 
     417                 :         /* Serialize relmapper state. */
     418 CBC         403 :         relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
     419             403 :         SerializeRelationMap(relmapperlen, relmapperspace);
     420             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
     421                 :                        relmapperspace);
     422                 : 
     423 ECB             :         /* Serialize uncommitted enum state. */
     424 CBC         403 :         uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
     425 ECB             :                                                  uncommittedenumslen);
     426 GIC         403 :         SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
     427             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
     428                 :                        uncommittedenumsspace);
     429 ECB             : 
     430                 :         /* Serialize our ClientConnectionInfo. */
     431 GNC         403 :         clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
     432             403 :         SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
     433             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
     434                 :                        clientconninfospace);
     435                 : 
     436                 :         /* Allocate space for worker information. */
     437 CBC         403 :         pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
     438 ECB             : 
     439                 :         /*
     440                 :          * Establish error queues in dynamic shared memory.
     441                 :          *
     442                 :          * These queues should be used only for transmitting ErrorResponse,
     443                 :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
     444                 :          * should be transmitted via separate (possibly larger?) queues.
     445                 :          */
     446                 :         error_queue_space =
     447 GIC         403 :             shm_toc_allocate(pcxt->toc,
     448 ECB             :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     449 GIC         403 :                                       pcxt->nworkers));
     450            1326 :         for (i = 0; i < pcxt->nworkers; ++i)
     451                 :         {
     452                 :             char       *start;
     453                 :             shm_mq     *mq;
     454                 : 
     455             923 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     456             923 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     457             923 :             shm_mq_set_receiver(mq, MyProc);
     458 CBC         923 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     459                 :         }
     460             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
     461 ECB             : 
     462                 :         /*
     463                 :          * Serialize entrypoint information.  It's unsafe to pass function
     464                 :          * pointers across processes, as the function pointer may be different
     465                 :          * in each process in EXEC_BACKEND builds, so we always pass library
     466                 :          * and function name.  (We use library name "postgres" for functions
     467                 :          * in the core backend.)
     468                 :          */
     469 CBC         403 :         lnamelen = strlen(pcxt->library_name);
     470 GIC         403 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
     471 CBC         403 :                                            strlen(pcxt->function_name) + 2);
     472 GIC         403 :         strcpy(entrypointstate, pcxt->library_name);
     473             403 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
     474             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
     475                 :     }
     476                 : 
     477                 :     /* Restore previous memory context. */
     478             403 :     MemoryContextSwitchTo(oldcontext);
     479             403 : }
     480 ECB             : 
     481                 : /*
     482                 :  * Reinitialize the dynamic shared memory segment for a parallel context such
     483                 :  * that we could launch workers for it again.
     484                 :  */
     485                 : void
     486 GIC         131 : ReinitializeParallelDSM(ParallelContext *pcxt)
     487                 : {
     488                 :     FixedParallelState *fps;
     489 ECB             : 
     490                 :     /* Wait for any old workers to exit. */
     491 GIC         131 :     if (pcxt->nworkers_launched > 0)
     492                 :     {
     493             131 :         WaitForParallelWorkersToFinish(pcxt);
     494             131 :         WaitForParallelWorkersToExit(pcxt);
     495             131 :         pcxt->nworkers_launched = 0;
     496             131 :         if (pcxt->known_attached_workers)
     497 ECB             :         {
     498 GIC         131 :             pfree(pcxt->known_attached_workers);
     499             131 :             pcxt->known_attached_workers = NULL;
     500             131 :             pcxt->nknown_attached_workers = 0;
     501                 :         }
     502 ECB             :     }
     503                 : 
     504                 :     /* Reset a few bits of fixed parallel state to a clean state. */
     505 CBC         131 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     506             131 :     fps->last_xlog_end = 0;
     507 ECB             : 
     508                 :     /* Recreate error queues (if they exist). */
     509 CBC         131 :     if (pcxt->nworkers > 0)
     510 ECB             :     {
     511                 :         char       *error_queue_space;
     512                 :         int         i;
     513                 : 
     514                 :         error_queue_space =
     515 GIC         131 :             shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
     516 CBC         546 :         for (i = 0; i < pcxt->nworkers; ++i)
     517 ECB             :         {
     518                 :             char       *start;
     519                 :             shm_mq     *mq;
     520                 : 
     521 GIC         415 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     522             415 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     523             415 :             shm_mq_set_receiver(mq, MyProc);
     524             415 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     525                 :         }
     526 ECB             :     }
     527 CBC         131 : }
     528                 : 
     529                 : /*
     530                 :  * Reinitialize parallel workers for a parallel context such that we could
     531                 :  * launch a different number of workers.  This is required for cases where
     532 ECB             :  * we need to reuse the same DSM segment, but the number of workers can
     533                 :  * vary from run-to-run.
     534                 :  */
     535                 : void
     536 GIC          11 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
     537                 : {
     538 ECB             :     /*
     539                 :      * The number of workers that need to be launched must be less than the
     540                 :      * number of workers with which the parallel context is initialized.
     541                 :      */
     542 GIC          11 :     Assert(pcxt->nworkers >= nworkers_to_launch);
     543              11 :     pcxt->nworkers_to_launch = nworkers_to_launch;
     544              11 : }
     545                 : 
     546                 : /*
     547 ECB             :  * Launch parallel workers.
     548                 :  */
     549                 : void
     550 GIC         534 : LaunchParallelWorkers(ParallelContext *pcxt)
     551                 : {
     552                 :     MemoryContext oldcontext;
     553 ECB             :     BackgroundWorker worker;
     554                 :     int         i;
     555 CBC         534 :     bool        any_registrations_failed = false;
     556                 : 
     557                 :     /* Skip this if we have no workers. */
     558 GIC         534 :     if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
     559 UIC           0 :         return;
     560                 : 
     561 ECB             :     /* We need to be a lock group leader. */
     562 GIC         534 :     BecomeLockGroupLeader();
     563                 : 
     564                 :     /* If we do have workers, we'd better have a DSM segment. */
     565             534 :     Assert(pcxt->seg != NULL);
     566 ECB             : 
     567                 :     /* We might be running in a short-lived memory context. */
     568 GIC         534 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     569 ECB             : 
     570 EUB             :     /* Configure a worker. */
     571 GIC         534 :     memset(&worker, 0, sizeof(worker));
     572             534 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
     573 ECB             :              MyProcPid);
     574 GIC         534 :     snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
     575             534 :     worker.bgw_flags =
     576 ECB             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
     577                 :         | BGWORKER_CLASS_PARALLEL;
     578 GIC         534 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     579 CBC         534 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     580 GIC         534 :     sprintf(worker.bgw_library_name, "postgres");
     581             534 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
     582 CBC         534 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
     583             534 :     worker.bgw_notify_pid = MyProcPid;
     584                 : 
     585 ECB             :     /*
     586                 :      * Start workers.
     587                 :      *
     588                 :      * The caller must be able to tolerate ending up with fewer workers than
     589                 :      * expected, so there is no need to throw an error here if registration
     590                 :      * fails.  It wouldn't help much anyway, because registering the worker in
     591                 :      * no way guarantees that it will start up and initialize successfully.
     592                 :      */
     593 CBC        1870 :     for (i = 0; i < pcxt->nworkers_to_launch; ++i)
     594 ECB             :     {
     595 GIC        1336 :         memcpy(worker.bgw_extra, &i, sizeof(int));
     596            2645 :         if (!any_registrations_failed &&
     597            1309 :             RegisterDynamicBackgroundWorker(&worker,
     598            1309 :                                             &pcxt->worker[i].bgwhandle))
     599                 :         {
     600            1298 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
     601            1298 :                               pcxt->worker[i].bgwhandle);
     602            1298 :             pcxt->nworkers_launched++;
     603                 :         }
     604 ECB             :         else
     605                 :         {
     606                 :             /*
     607                 :              * If we weren't able to register the worker, then we've bumped up
     608                 :              * against the max_worker_processes limit, and future
     609                 :              * registrations will probably fail too, so arrange to skip them.
     610                 :              * But we still have to execute this code for the remaining slots
     611                 :              * to make sure that we forget about the error queues we budgeted
     612                 :              * for those workers.  Otherwise, we'll wait for them to start,
     613                 :              * but they never will.
     614                 :              */
     615 GIC          38 :             any_registrations_failed = true;
     616              38 :             pcxt->worker[i].bgwhandle = NULL;
     617              38 :             shm_mq_detach(pcxt->worker[i].error_mqh);
     618              38 :             pcxt->worker[i].error_mqh = NULL;
     619                 :         }
     620                 :     }
     621                 : 
     622                 :     /*
     623                 :      * Now that nworkers_launched has taken its final value, we can initialize
     624                 :      * known_attached_workers.
     625                 :      */
     626 CBC         534 :     if (pcxt->nworkers_launched > 0)
     627 ECB             :     {
     628 CBC         525 :         pcxt->known_attached_workers =
     629             525 :             palloc0(sizeof(bool) * pcxt->nworkers_launched);
     630 GIC         525 :         pcxt->nknown_attached_workers = 0;
     631                 :     }
     632                 : 
     633                 :     /* Restore previous memory context. */
     634             534 :     MemoryContextSwitchTo(oldcontext);
     635                 : }
     636                 : 
     637 ECB             : /*
     638                 :  * Wait for all workers to attach to their error queues, and throw an error if
     639                 :  * any worker fails to do this.
     640                 :  *
     641                 :  * Callers can assume that if this function returns successfully, then the
     642                 :  * number of workers given by pcxt->nworkers_launched have initialized and
     643                 :  * attached to their error queues.  Whether or not these workers are guaranteed
     644                 :  * to still be running depends on what code the caller asked them to run;
     645                 :  * this function does not guarantee that they have not exited.  However, it
     646                 :  * does guarantee that any workers which exited must have done so cleanly and
     647                 :  * after successfully performing the work with which they were tasked.
     648                 :  *
     649                 :  * If this function is not called, then some of the workers that were launched
     650                 :  * may not have been started due to a fork() failure, or may have exited during
     651                 :  * early startup prior to attaching to the error queue, so nworkers_launched
     652                 :  * cannot be viewed as completely reliable.  It will never be less than the
     653                 :  * number of workers which actually started, but it might be more.  Any workers
     654                 :  * that failed to start will still be discovered by
     655                 :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
     656                 :  * provided that function is eventually reached.
     657                 :  *
     658                 :  * In general, the leader process should do as much work as possible before
     659                 :  * calling this function.  fork() failures and other early-startup failures
     660                 :  * are very uncommon, and having the leader sit idle when it could be doing
     661                 :  * useful work is undesirable.  However, if the leader needs to wait for
     662                 :  * all of its workers or for a specific worker, it may want to call this
     663                 :  * function before doing so.  If not, it must make some other provision for
     664                 :  * the failure-to-start case, lest it wait forever.  On the other hand, a
     665                 :  * leader which never waits for a worker that might not be started yet, or
     666                 :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
     667                 :  * call this function at all.
     668                 :  */
     669                 : void
     670 GIC          71 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
     671                 : {
     672                 :     int         i;
     673                 : 
     674                 :     /* Skip this if we have no launched workers. */
     675              71 :     if (pcxt->nworkers_launched == 0)
     676 UIC           0 :         return;
     677                 : 
     678                 :     for (;;)
     679                 :     {
     680                 :         /*
     681 ECB             :          * This will process any parallel messages that are pending and it may
     682                 :          * also throw an error propagated from a worker.
     683                 :          */
     684 GIC     4193812 :         CHECK_FOR_INTERRUPTS();
     685                 : 
     686 CBC     8387624 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     687 EUB             :         {
     688                 :             BgwHandleStatus status;
     689                 :             shm_mq     *mq;
     690                 :             int         rc;
     691                 :             pid_t       pid;
     692                 : 
     693 GIC     4193812 :             if (pcxt->known_attached_workers[i])
     694              14 :                 continue;
     695 ECB             : 
     696                 :             /*
     697                 :              * If error_mqh is NULL, then the worker has already exited
     698                 :              * cleanly.
     699                 :              */
     700 GIC     4193798 :             if (pcxt->worker[i].error_mqh == NULL)
     701                 :             {
     702 UIC           0 :                 pcxt->known_attached_workers[i] = true;
     703               0 :                 ++pcxt->nknown_attached_workers;
     704 LBC           0 :                 continue;
     705 ECB             :             }
     706                 : 
     707 GIC     4193798 :             status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
     708         4193798 :             if (status == BGWH_STARTED)
     709                 :             {
     710                 :                 /* Has the worker attached to the error queue? */
     711 CBC     4193742 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     712 GIC     4193742 :                 if (shm_mq_get_sender(mq) != NULL)
     713 EUB             :                 {
     714                 :                     /* Yes, so it is known to be attached. */
     715 GBC          57 :                     pcxt->known_attached_workers[i] = true;
     716 GIC          57 :                     ++pcxt->nknown_attached_workers;
     717                 :                 }
     718 ECB             :             }
     719 CBC          56 :             else if (status == BGWH_STOPPED)
     720                 :             {
     721                 :                 /*
     722 ECB             :                  * If the worker stopped without attaching to the error queue,
     723                 :                  * throw an error.
     724                 :                  */
     725 UIC           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     726 LBC           0 :                 if (shm_mq_get_sender(mq) == NULL)
     727               0 :                     ereport(ERROR,
     728                 :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     729                 :                              errmsg("parallel worker failed to initialize"),
     730 ECB             :                              errhint("More details may be available in the server log.")));
     731                 : 
     732 UIC           0 :                 pcxt->known_attached_workers[i] = true;
     733               0 :                 ++pcxt->nknown_attached_workers;
     734                 :             }
     735                 :             else
     736 EUB             :             {
     737                 :                 /*
     738                 :                  * Worker not yet started, so we must wait.  The postmaster
     739                 :                  * will notify us if the worker's state changes.  Our latch
     740                 :                  * might also get set for some other reason, but if so we'll
     741                 :                  * just end up waiting for the same worker again.
     742                 :                  */
     743 GBC          56 :                 rc = WaitLatch(MyLatch,
     744 EUB             :                                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     745                 :                                -1, WAIT_EVENT_BGWORKER_STARTUP);
     746                 : 
     747 GIC          56 :                 if (rc & WL_LATCH_SET)
     748              56 :                     ResetLatch(MyLatch);
     749                 :             }
     750                 :         }
     751                 : 
     752                 :         /* If all workers are known to have started, we're done. */
     753         4193812 :         if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
     754 ECB             :         {
     755 GIC          71 :             Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
     756              71 :             break;
     757                 :         }
     758 ECB             :     }
     759                 : }
     760                 : 
     761                 : /*
     762                 :  * Wait for all workers to finish computing.
     763                 :  *
     764                 :  * Even if the parallel operation seems to have completed successfully, it's
     765                 :  * important to call this function afterwards.  We must not miss any errors
     766                 :  * the workers may have thrown during the parallel operation, or any that they
     767                 :  * may yet throw while shutting down.
     768                 :  *
     769                 :  * Also, we want to update our notion of XactLastRecEnd based on worker
     770                 :  * feedback.
     771                 :  */
     772                 : void
     773 GIC         662 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
     774                 : {
     775                 :     for (;;)
     776             569 :     {
     777            1231 :         bool        anyone_alive = false;
     778            1231 :         int         nfinished = 0;
     779                 :         int         i;
     780                 : 
     781                 :         /*
     782                 :          * This will process any parallel messages that are pending, which may
     783                 :          * change the outcome of the loop that follows.  It may also throw an
     784 ECB             :          * error propagated from a worker.
     785                 :          */
     786 GIC        1231 :         CHECK_FOR_INTERRUPTS();
     787 ECB             : 
     788 CBC        3281 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     789 ECB             :         {
     790                 :             /*
     791                 :              * If error_mqh is NULL, then the worker has already exited
     792                 :              * cleanly.  If we have received a message through error_mqh from
     793                 :              * the worker, we know it started up cleanly, and therefore we're
     794                 :              * certain to be notified when it exits.
     795                 :              */
     796 GIC        2591 :             if (pcxt->worker[i].error_mqh == NULL)
     797 CBC        1998 :                 ++nfinished;
     798 GIC         593 :             else if (pcxt->known_attached_workers[i])
     799 ECB             :             {
     800 GIC         541 :                 anyone_alive = true;
     801             541 :                 break;
     802                 :             }
     803                 :         }
     804                 : 
     805            1231 :         if (!anyone_alive)
     806                 :         {
     807 ECB             :             /* If all workers are known to have finished, we're done. */
     808 CBC         690 :             if (nfinished >= pcxt->nworkers_launched)
     809 ECB             :             {
     810 GIC         662 :                 Assert(nfinished == pcxt->nworkers_launched);
     811 CBC         662 :                 break;
     812 ECB             :             }
     813                 : 
     814                 :             /*
     815                 :              * We didn't detect any living workers, but not all workers are
     816                 :              * known to have exited cleanly.  Either not all workers have
     817                 :              * launched yet, or maybe some of them failed to start or
     818                 :              * terminated abnormally.
     819                 :              */
     820 GIC          76 :             for (i = 0; i < pcxt->nworkers_launched; ++i)
     821 ECB             :             {
     822                 :                 pid_t       pid;
     823                 :                 shm_mq     *mq;
     824                 : 
     825                 :                 /*
     826                 :                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
     827                 :                  * should just keep waiting.  If it is BGWH_STOPPED, then
     828                 :                  * further investigation is needed.
     829                 :                  */
     830 GIC          48 :                 if (pcxt->worker[i].error_mqh == NULL ||
     831 CBC          92 :                     pcxt->worker[i].bgwhandle == NULL ||
     832 GIC          46 :                     GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
     833                 :                                            &pid) != BGWH_STOPPED)
     834              48 :                     continue;
     835                 : 
     836                 :                 /*
     837                 :                  * Check whether the worker ended up stopped without ever
     838                 :                  * attaching to the error queue.  If so, the postmaster was
     839                 :                  * unable to fork the worker or it exited without initializing
     840                 :                  * properly.  We must throw an error, since the caller may
     841 ECB             :                  * have been expecting the worker to do some work before
     842                 :                  * exiting.
     843                 :                  */
     844 UIC           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     845 LBC           0 :                 if (shm_mq_get_sender(mq) == NULL)
     846 UIC           0 :                     ereport(ERROR,
     847                 :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     848                 :                              errmsg("parallel worker failed to initialize"),
     849                 :                              errhint("More details may be available in the server log.")));
     850                 : 
     851                 :                 /*
     852                 :                  * The worker is stopped, but is attached to the error queue.
     853                 :                  * Unless there's a bug somewhere, this will only happen when
     854                 :                  * the worker writes messages and terminates after the
     855 EUB             :                  * CHECK_FOR_INTERRUPTS() near the top of this function and
     856                 :                  * before the call to GetBackgroundWorkerPid().  In that case,
     857                 :                  * or latch should have been set as well and the right things
     858                 :                  * will happen on the next pass through the loop.
     859                 :                  */
     860                 :             }
     861                 :         }
     862                 : 
     863 GIC         569 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     864                 :                          WAIT_EVENT_PARALLEL_FINISH);
     865             569 :         ResetLatch(MyLatch);
     866                 :     }
     867                 : 
     868             662 :     if (pcxt->toc != NULL)
     869                 :     {
     870                 :         FixedParallelState *fps;
     871                 : 
     872             662 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     873             662 :         if (fps->last_xlog_end > XactLastRecEnd)
     874 CBC          10 :             XactLastRecEnd = fps->last_xlog_end;
     875                 :     }
     876             662 : }
     877                 : 
     878                 : /*
     879 ECB             :  * Wait for all workers to exit.
     880                 :  *
     881                 :  * This function ensures that workers have been completely shutdown.  The
     882                 :  * difference between WaitForParallelWorkersToFinish and this function is
     883                 :  * that the former just ensures that last message sent by a worker backend is
     884                 :  * received by the leader backend whereas this ensures the complete shutdown.
     885                 :  */
     886                 : static void
     887 CBC         534 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
     888                 : {
     889                 :     int         i;
     890                 : 
     891                 :     /* Wait until the workers actually die. */
     892 GIC        1832 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
     893                 :     {
     894                 :         BgwHandleStatus status;
     895                 : 
     896            1298 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
     897 UIC           0 :             continue;
     898 ECB             : 
     899 GIC        1298 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
     900                 : 
     901                 :         /*
     902                 :          * If the postmaster kicked the bucket, we have no chance of cleaning
     903 ECB             :          * up safely -- we won't be able to tell when our workers are actually
     904                 :          * dead.  This doesn't necessitate a PANIC since they will all abort
     905                 :          * eventually, but we can't safely continue this session.
     906                 :          */
     907 CBC        1298 :         if (status == BGWH_POSTMASTER_DIED)
     908 UBC           0 :             ereport(FATAL,
     909                 :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     910 ECB             :                      errmsg("postmaster exited during a parallel transaction")));
     911                 : 
     912                 :         /* Release memory. */
     913 GIC        1298 :         pfree(pcxt->worker[i].bgwhandle);
     914            1298 :         pcxt->worker[i].bgwhandle = NULL;
     915                 :     }
     916             534 : }
     917                 : 
     918 ECB             : /*
     919 EUB             :  * Destroy a parallel context.
     920                 :  *
     921                 :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
     922                 :  * first, before calling this function.  When this function is invoked, any
     923                 :  * remaining workers are forcibly killed; the dynamic shared memory segment
     924 ECB             :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
     925                 :  */
     926                 : void
     927 CBC         403 : DestroyParallelContext(ParallelContext *pcxt)
     928                 : {
     929                 :     int         i;
     930                 : 
     931                 :     /*
     932                 :      * Be careful about order of operations here!  We remove the parallel
     933                 :      * context from the list before we do anything else; otherwise, if an
     934                 :      * error occurs during a subsequent step, we might try to nuke it again
     935                 :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
     936                 :      */
     937 GIC         403 :     dlist_delete(&pcxt->node);
     938 ECB             : 
     939                 :     /* Kill each worker in turn, and forget their error queues. */
     940 GIC         403 :     if (pcxt->worker != NULL)
     941                 :     {
     942            1286 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     943                 :         {
     944             883 :             if (pcxt->worker[i].error_mqh != NULL)
     945                 :             {
     946               3 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
     947                 : 
     948 CBC           3 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
     949 GIC           3 :                 pcxt->worker[i].error_mqh = NULL;
     950                 :             }
     951 ECB             :         }
     952                 :     }
     953                 : 
     954                 :     /*
     955                 :      * If we have allocated a shared memory segment, detach it.  This will
     956                 :      * implicitly detach the error queues, and any other shared memory queues,
     957                 :      * stored there.
     958                 :      */
     959 CBC         403 :     if (pcxt->seg != NULL)
     960 ECB             :     {
     961 GIC         403 :         dsm_detach(pcxt->seg);
     962             403 :         pcxt->seg = NULL;
     963                 :     }
     964                 : 
     965                 :     /*
     966                 :      * If this parallel context is actually in backend-private memory rather
     967                 :      * than shared memory, free that memory instead.
     968                 :      */
     969             403 :     if (pcxt->private_memory != NULL)
     970 ECB             :     {
     971 UIC           0 :         pfree(pcxt->private_memory);
     972 LBC           0 :         pcxt->private_memory = NULL;
     973 ECB             :     }
     974                 : 
     975                 :     /*
     976                 :      * We can't finish transaction commit or abort until all of the workers
     977                 :      * have exited.  This means, in particular, that we can't respond to
     978                 :      * interrupts at this stage.
     979                 :      */
     980 CBC         403 :     HOLD_INTERRUPTS();
     981 GIC         403 :     WaitForParallelWorkersToExit(pcxt);
     982 GBC         403 :     RESUME_INTERRUPTS();
     983 EUB             : 
     984                 :     /* Free the worker array itself. */
     985 GIC         403 :     if (pcxt->worker != NULL)
     986                 :     {
     987             403 :         pfree(pcxt->worker);
     988             403 :         pcxt->worker = NULL;
     989                 :     }
     990                 : 
     991 ECB             :     /* Free memory. */
     992 CBC         403 :     pfree(pcxt->library_name);
     993             403 :     pfree(pcxt->function_name);
     994 GIC         403 :     pfree(pcxt);
     995             403 : }
     996 ECB             : 
     997                 : /*
     998                 :  * Are there any parallel contexts currently active?
     999                 :  */
    1000                 : bool
    1001 GIC      483208 : ParallelContextActive(void)
    1002                 : {
    1003 CBC      483208 :     return !dlist_is_empty(&pcxt_list);
    1004 ECB             : }
    1005                 : 
    1006                 : /*
    1007                 :  * Handle receipt of an interrupt indicating a parallel worker message.
    1008                 :  *
    1009                 :  * Note: this is called within a signal handler!  All we can do is set
    1010                 :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
    1011                 :  * HandleParallelMessages().
    1012                 :  */
    1013                 : void
    1014 CBC        2711 : HandleParallelMessageInterrupt(void)
    1015                 : {
    1016 GIC        2711 :     InterruptPending = true;
    1017            2711 :     ParallelMessagePending = true;
    1018            2711 :     SetLatch(MyLatch);
    1019            2711 : }
    1020                 : 
    1021                 : /*
    1022                 :  * Handle any queued protocol messages received from parallel workers.
    1023                 :  */
    1024                 : void
    1025 CBC        2606 : HandleParallelMessages(void)
    1026                 : {
    1027 ECB             :     dlist_iter  iter;
    1028                 :     MemoryContext oldcontext;
    1029                 : 
    1030                 :     static MemoryContext hpm_context = NULL;
    1031                 : 
    1032                 :     /*
    1033                 :      * This is invoked from ProcessInterrupts(), and since some of the
    1034                 :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1035                 :      * for recursive calls if more signals are received while this runs.  It's
    1036                 :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1037                 :      * even if it is safe, so let's block interrupts until done.
    1038                 :      */
    1039 GIC        2606 :     HOLD_INTERRUPTS();
    1040                 : 
    1041                 :     /*
    1042                 :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
    1043                 :      * don't want to risk leaking data into long-lived contexts, so let's do
    1044                 :      * our work here in a private context that we can reset on each use.
    1045                 :      */
    1046            2606 :     if (hpm_context == NULL)    /* first time through? */
    1047              55 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
    1048                 :                                             "HandleParallelMessages",
    1049                 :                                             ALLOCSET_DEFAULT_SIZES);
    1050 ECB             :     else
    1051 GIC        2551 :         MemoryContextReset(hpm_context);
    1052                 : 
    1053            2606 :     oldcontext = MemoryContextSwitchTo(hpm_context);
    1054                 : 
    1055                 :     /* OK to process messages.  Reset the flag saying there are more to do. */
    1056            2606 :     ParallelMessagePending = false;
    1057 ECB             : 
    1058 CBC        5215 :     dlist_foreach(iter, &pcxt_list)
    1059                 :     {
    1060                 :         ParallelContext *pcxt;
    1061                 :         int         i;
    1062 ECB             : 
    1063 GIC        2612 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
    1064 CBC        2612 :         if (pcxt->worker == NULL)
    1065 UIC           0 :             continue;
    1066                 : 
    1067 CBC       10396 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
    1068                 :         {
    1069 ECB             :             /*
    1070                 :              * Read as many messages as we can from each worker, but stop when
    1071                 :              * either (1) the worker's error queue goes away, which can happen
    1072                 :              * if we receive a Terminate message from the worker; or (2) no
    1073                 :              * more messages can be read from the worker without blocking.
    1074                 :              */
    1075 CBC       10380 :             while (pcxt->worker[i].error_mqh != NULL)
    1076 EUB             :             {
    1077                 :                 shm_mq_result res;
    1078 ECB             :                 Size        nbytes;
    1079                 :                 void       *data;
    1080                 : 
    1081 GIC        7560 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
    1082                 :                                      &data, true);
    1083            7560 :                 if (res == SHM_MQ_WOULD_BLOCK)
    1084            4964 :                     break;
    1085            2596 :                 else if (res == SHM_MQ_SUCCESS)
    1086 ECB             :                 {
    1087                 :                     StringInfoData msg;
    1088                 : 
    1089 GIC        2596 :                     initStringInfo(&msg);
    1090            2596 :                     appendBinaryStringInfo(&msg, data, nbytes);
    1091            2596 :                     HandleParallelMessage(pcxt, i, &msg);
    1092 CBC        2593 :                     pfree(msg.data);
    1093                 :                 }
    1094 ECB             :                 else
    1095 LBC           0 :                     ereport(ERROR,
    1096 ECB             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1097                 :                              errmsg("lost connection to parallel worker")));
    1098                 :             }
    1099                 :         }
    1100                 :     }
    1101                 : 
    1102 CBC        2603 :     MemoryContextSwitchTo(oldcontext);
    1103 ECB             : 
    1104                 :     /* Might as well clear the context on our way out */
    1105 GIC        2603 :     MemoryContextReset(hpm_context);
    1106 EUB             : 
    1107 GIC        2603 :     RESUME_INTERRUPTS();
    1108            2603 : }
    1109                 : 
    1110                 : /*
    1111                 :  * Handle a single protocol message received from a single parallel worker.
    1112                 :  */
    1113 ECB             : static void
    1114 GIC        2596 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
    1115                 : {
    1116 ECB             :     char        msgtype;
    1117                 : 
    1118 CBC        2596 :     if (pcxt->known_attached_workers != NULL &&
    1119            2596 :         !pcxt->known_attached_workers[i])
    1120                 :     {
    1121 GIC        1241 :         pcxt->known_attached_workers[i] = true;
    1122            1241 :         pcxt->nknown_attached_workers++;
    1123                 :     }
    1124                 : 
    1125 CBC        2596 :     msgtype = pq_getmsgbyte(msg);
    1126                 : 
    1127 GIC        2596 :     switch (msgtype)
    1128                 :     {
    1129 CBC        1298 :         case 'K':               /* BackendKeyData */
    1130 ECB             :             {
    1131 GIC        1298 :                 int32       pid = pq_getmsgint(msg, 4);
    1132 ECB             : 
    1133 CBC        1298 :                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
    1134 GIC        1298 :                 (void) pq_getmsgend(msg);
    1135            1298 :                 pcxt->worker[i].pid = pid;
    1136 CBC        1298 :                 break;
    1137                 :             }
    1138 ECB             : 
    1139 GIC           3 :         case 'E':               /* ErrorResponse */
    1140 ECB             :         case 'N':               /* NoticeResponse */
    1141                 :             {
    1142                 :                 ErrorData   edata;
    1143                 :                 ErrorContextCallback *save_error_context_stack;
    1144                 : 
    1145                 :                 /* Parse ErrorResponse or NoticeResponse. */
    1146 CBC           3 :                 pq_parse_errornotice(msg, &edata);
    1147 ECB             : 
    1148                 :                 /* Death of a worker isn't enough justification for suicide. */
    1149 GIC           3 :                 edata.elevel = Min(edata.elevel, ERROR);
    1150 ECB             : 
    1151                 :                 /*
    1152                 :                  * If desired, add a context line to show that this is a
    1153                 :                  * message propagated from a parallel worker.  Otherwise, it
    1154                 :                  * can sometimes be confusing to understand what actually
    1155                 :                  * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
    1156                 :                  * because it causes test-result instability depending on
    1157                 :                  * whether a parallel worker is actually used or not.)
    1158                 :                  */
    1159 GNC           3 :                 if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
    1160 ECB             :                 {
    1161 GIC           3 :                     if (edata.context)
    1162 UIC           0 :                         edata.context = psprintf("%s\n%s", edata.context,
    1163                 :                                                  _("parallel worker"));
    1164                 :                     else
    1165 GIC           3 :                         edata.context = pstrdup(_("parallel worker"));
    1166                 :                 }
    1167                 : 
    1168                 :                 /*
    1169                 :                  * Context beyond that should use the error context callbacks
    1170 ECB             :                  * that were in effect when the ParallelContext was created,
    1171                 :                  * not the current ones.
    1172                 :                  */
    1173 GBC           3 :                 save_error_context_stack = error_context_stack;
    1174 GIC           3 :                 error_context_stack = pcxt->error_context_stack;
    1175                 : 
    1176 ECB             :                 /* Rethrow error or print notice. */
    1177 GIC           3 :                 ThrowErrorData(&edata);
    1178                 : 
    1179                 :                 /* Not an error, so restore previous context stack. */
    1180 UIC           0 :                 error_context_stack = save_error_context_stack;
    1181                 : 
    1182               0 :                 break;
    1183                 :             }
    1184 ECB             : 
    1185 LBC           0 :         case 'A':               /* NotifyResponse */
    1186                 :             {
    1187                 :                 /* Propagate NotifyResponse. */
    1188 ECB             :                 int32       pid;
    1189                 :                 const char *channel;
    1190                 :                 const char *payload;
    1191 EUB             : 
    1192 UIC           0 :                 pid = pq_getmsgint(msg, 4);
    1193 UBC           0 :                 channel = pq_getmsgrawstring(msg);
    1194 UIC           0 :                 payload = pq_getmsgrawstring(msg);
    1195               0 :                 pq_endmessage(msg);
    1196 EUB             : 
    1197 UIC           0 :                 NotifyMyFrontEnd(channel, payload, pid);
    1198                 : 
    1199               0 :                 break;
    1200                 :             }
    1201                 : 
    1202 GIC        1295 :         case 'X':               /* Terminate, indicating clean exit */
    1203 EUB             :             {
    1204 GBC        1295 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
    1205            1295 :                 pcxt->worker[i].error_mqh = NULL;
    1206            1295 :                 break;
    1207                 :             }
    1208 EUB             : 
    1209 UIC           0 :         default:
    1210 EUB             :             {
    1211 UIC           0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
    1212                 :                      msgtype, msg->len);
    1213 ECB             :             }
    1214                 :     }
    1215 CBC        2593 : }
    1216 ECB             : 
    1217                 : /*
    1218                 :  * End-of-subtransaction cleanup for parallel contexts.
    1219                 :  *
    1220 EUB             :  * Currently, it's forbidden to enter or leave a subtransaction while
    1221                 :  * parallel mode is in effect, so we could just blow away everything.  But
    1222                 :  * we may want to relax that restriction in the future, so this code
    1223                 :  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
    1224                 :  */
    1225                 : void
    1226 CBC           3 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
    1227                 : {
    1228 GIC           6 :     while (!dlist_is_empty(&pcxt_list))
    1229                 :     {
    1230                 :         ParallelContext *pcxt;
    1231                 : 
    1232               3 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1233               3 :         if (pcxt->subid != mySubId)
    1234 UIC           0 :             break;
    1235 GIC           3 :         if (isCommit)
    1236 UIC           0 :             elog(WARNING, "leaked parallel context");
    1237 CBC           3 :         DestroyParallelContext(pcxt);
    1238                 :     }
    1239               3 : }
    1240                 : 
    1241                 : /*
    1242                 :  * End-of-transaction cleanup for parallel contexts.
    1243 ECB             :  */
    1244                 : void
    1245 GBC        1298 : AtEOXact_Parallel(bool isCommit)
    1246 ECB             : {
    1247 GBC        1298 :     while (!dlist_is_empty(&pcxt_list))
    1248 ECB             :     {
    1249                 :         ParallelContext *pcxt;
    1250                 : 
    1251 UIC           0 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1252               0 :         if (isCommit)
    1253               0 :             elog(WARNING, "leaked parallel context");
    1254               0 :         DestroyParallelContext(pcxt);
    1255                 :     }
    1256 CBC        1298 : }
    1257                 : 
    1258 ECB             : /*
    1259                 :  * Main entrypoint for parallel workers.
    1260                 :  */
    1261                 : void
    1262 GBC        1298 : ParallelWorkerMain(Datum main_arg)
    1263 EUB             : {
    1264                 :     dsm_segment *seg;
    1265                 :     shm_toc    *toc;
    1266                 :     FixedParallelState *fps;
    1267 ECB             :     char       *error_queue_space;
    1268                 :     shm_mq     *mq;
    1269                 :     shm_mq_handle *mqh;
    1270                 :     char       *libraryspace;
    1271                 :     char       *entrypointstate;
    1272                 :     char       *library_name;
    1273                 :     char       *function_name;
    1274                 :     parallel_worker_main_type entrypt;
    1275                 :     char       *gucspace;
    1276                 :     char       *combocidspace;
    1277                 :     char       *tsnapspace;
    1278                 :     char       *asnapspace;
    1279                 :     char       *tstatespace;
    1280                 :     char       *pendingsyncsspace;
    1281                 :     char       *reindexspace;
    1282                 :     char       *relmapperspace;
    1283                 :     char       *uncommittedenumsspace;
    1284                 :     char       *clientconninfospace;
    1285                 :     StringInfoData msgbuf;
    1286                 :     char       *session_dsm_handle_space;
    1287                 :     Snapshot    tsnapshot;
    1288                 :     Snapshot    asnapshot;
    1289                 : 
    1290                 :     /* Set flag to indicate that we're initializing a parallel worker. */
    1291 GIC        1298 :     InitializingParallelWorker = true;
    1292                 : 
    1293                 :     /* Establish signal handlers. */
    1294            1298 :     pqsignal(SIGTERM, die);
    1295            1298 :     BackgroundWorkerUnblockSignals();
    1296                 : 
    1297                 :     /* Determine and set our parallel worker number. */
    1298            1298 :     Assert(ParallelWorkerNumber == -1);
    1299            1298 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
    1300                 : 
    1301                 :     /* Set up a memory context to work in, just for cleanliness. */
    1302            1298 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
    1303 ECB             :                                                  "Parallel worker",
    1304                 :                                                  ALLOCSET_DEFAULT_SIZES);
    1305                 : 
    1306                 :     /*
    1307                 :      * Attach to the dynamic shared memory segment for the parallel query, and
    1308                 :      * find its table of contents.
    1309                 :      *
    1310                 :      * Note: at this point, we have not created any ResourceOwner in this
    1311                 :      * process.  This will result in our DSM mapping surviving until process
    1312                 :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
    1313                 :      * ownership of the mapping, but we have no need for that.
    1314                 :      */
    1315 GIC        1298 :     seg = dsm_attach(DatumGetUInt32(main_arg));
    1316            1298 :     if (seg == NULL)
    1317 UIC           0 :         ereport(ERROR,
    1318                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1319                 :                  errmsg("could not map dynamic shared memory segment")));
    1320 GIC        1298 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
    1321            1298 :     if (toc == NULL)
    1322 UIC           0 :         ereport(ERROR,
    1323                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1324                 :                  errmsg("invalid magic number in dynamic shared memory segment")));
    1325                 : 
    1326                 :     /* Look up fixed parallel state. */
    1327 CBC        1298 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
    1328            1298 :     MyFixedParallelState = fps;
    1329 EUB             : 
    1330                 :     /* Arrange to signal the leader if we exit. */
    1331 GIC        1298 :     ParallelLeaderPid = fps->parallel_leader_pid;
    1332 CBC        1298 :     ParallelLeaderBackendId = fps->parallel_leader_backend_id;
    1333            1298 :     before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
    1334 EUB             : 
    1335                 :     /*
    1336                 :      * Now we can find and attach to the error queue provided for us.  That's
    1337                 :      * good, because until we do that, any errors that happen here will not be
    1338                 :      * reported back to the process that requested that this worker be
    1339 ECB             :      * launched.
    1340                 :      */
    1341 GIC        1298 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
    1342            1298 :     mq = (shm_mq *) (error_queue_space +
    1343 CBC        1298 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
    1344            1298 :     shm_mq_set_sender(mq, MyProc);
    1345            1298 :     mqh = shm_mq_attach(mq, seg, NULL);
    1346 GIC        1298 :     pq_redirect_to_shm_mq(seg, mqh);
    1347            1298 :     pq_set_parallel_leader(fps->parallel_leader_pid,
    1348                 :                            fps->parallel_leader_backend_id);
    1349                 : 
    1350                 :     /*
    1351                 :      * Send a BackendKeyData message to the process that initiated parallelism
    1352                 :      * so that it has access to our PID before it receives any other messages
    1353 ECB             :      * from us.  Our cancel key is sent, too, since that's the way the
    1354                 :      * protocol message is defined, but it won't actually be used for anything
    1355                 :      * in this case.
    1356                 :      */
    1357 CBC        1298 :     pq_beginmessage(&msgbuf, 'K');
    1358            1298 :     pq_sendint32(&msgbuf, (int32) MyProcPid);
    1359            1298 :     pq_sendint32(&msgbuf, (int32) MyCancelKey);
    1360 GIC        1298 :     pq_endmessage(&msgbuf);
    1361                 : 
    1362                 :     /*
    1363                 :      * Hooray! Primary initialization is complete.  Now, we need to set up our
    1364                 :      * backend-local state to match the original backend.
    1365                 :      */
    1366                 : 
    1367                 :     /*
    1368                 :      * Join locking group.  We must do this before anything that could try to
    1369 ECB             :      * acquire a heavyweight lock, because any heavyweight locks acquired to
    1370                 :      * this point could block either directly against the parallel group
    1371                 :      * leader or against some process which in turn waits for a lock that
    1372                 :      * conflicts with the parallel group leader, causing an undetected
    1373                 :      * deadlock.  (If we can't join the lock group, the leader has gone away,
    1374                 :      * so just exit quietly.)
    1375                 :      */
    1376 GIC        1298 :     if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
    1377                 :                                fps->parallel_leader_pid))
    1378 UIC           0 :         return;
    1379                 : 
    1380                 :     /*
    1381                 :      * Restore transaction and statement start-time timestamps.  This must
    1382                 :      * happen before anything that would start a transaction, else asserts in
    1383                 :      * xact.c will fire.
    1384                 :      */
    1385 GIC        1298 :     SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
    1386                 : 
    1387                 :     /*
    1388 ECB             :      * Identify the entry point to be called.  In theory this could result in
    1389                 :      * loading an additional library, though most likely the entry point is in
    1390 EUB             :      * the core backend or in a library we just loaded.
    1391                 :      */
    1392 GIC        1298 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
    1393            1298 :     library_name = entrypointstate;
    1394            1298 :     function_name = entrypointstate + strlen(library_name) + 1;
    1395                 : 
    1396            1298 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
    1397 ECB             : 
    1398                 :     /* Restore database connection. */
    1399 GIC        1298 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
    1400                 :                                               fps->authenticated_user_id,
    1401                 :                                               0);
    1402                 : 
    1403                 :     /*
    1404 ECB             :      * Set the client encoding to the database encoding, since that is what
    1405                 :      * the leader will expect.
    1406                 :      */
    1407 GIC        1298 :     SetClientEncoding(GetDatabaseEncoding());
    1408 ECB             : 
    1409                 :     /*
    1410                 :      * Load libraries that were loaded by original backend.  We want to do
    1411                 :      * this before restoring GUCs, because the libraries might define custom
    1412                 :      * variables.
    1413                 :      */
    1414 GIC        1298 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
    1415            1298 :     StartTransactionCommand();
    1416            1298 :     RestoreLibraryState(libraryspace);
    1417                 : 
    1418                 :     /* Restore GUC values from launching backend. */
    1419 CBC        1298 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
    1420 GIC        1298 :     RestoreGUCState(gucspace);
    1421            1298 :     CommitTransactionCommand();
    1422                 : 
    1423                 :     /* Crank up a transaction state appropriate to a parallel worker. */
    1424            1298 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
    1425            1298 :     StartParallelWorkerTransaction(tstatespace);
    1426 ECB             : 
    1427                 :     /* Restore combo CID state. */
    1428 CBC        1298 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
    1429 GIC        1298 :     RestoreComboCIDState(combocidspace);
    1430                 : 
    1431 ECB             :     /* Attach to the per-session DSM segment and contained objects. */
    1432                 :     session_dsm_handle_space =
    1433 CBC        1298 :         shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
    1434 GIC        1298 :     AttachSession(*(dsm_handle *) session_dsm_handle_space);
    1435                 : 
    1436 ECB             :     /*
    1437                 :      * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
    1438                 :      * the leader has serialized the transaction snapshot and we must restore
    1439                 :      * it. At lower isolation levels, there is no transaction-lifetime
    1440                 :      * snapshot, but we need TransactionXmin to get set to a value which is
    1441                 :      * less than or equal to the xmin of every snapshot that will be used by
    1442                 :      * this worker. The easiest way to accomplish that is to install the
    1443                 :      * active snapshot as the transaction snapshot. Code running in this
    1444                 :      * parallel worker might take new snapshots via GetTransactionSnapshot()
    1445                 :      * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
    1446                 :      * snapshot older than the active snapshot.
    1447                 :      */
    1448 GIC        1298 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
    1449            1298 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
    1450            1298 :     asnapshot = RestoreSnapshot(asnapspace);
    1451            1298 :     tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
    1452            1298 :     RestoreTransactionSnapshot(tsnapshot,
    1453            1298 :                                fps->parallel_leader_pgproc);
    1454            1298 :     PushActiveSnapshot(asnapshot);
    1455                 : 
    1456                 :     /*
    1457                 :      * We've changed which tuples we can see, and must therefore invalidate
    1458                 :      * system caches.
    1459                 :      */
    1460 CBC        1298 :     InvalidateSystemCaches();
    1461 ECB             : 
    1462                 :     /*
    1463                 :      * Restore current role id.  Skip verifying whether session user is
    1464                 :      * allowed to become this role and blindly restore the leader's state for
    1465                 :      * current role.
    1466                 :      */
    1467 GIC        1298 :     SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
    1468                 : 
    1469                 :     /* Restore user ID and security context. */
    1470            1298 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
    1471                 : 
    1472 ECB             :     /* Restore temp-namespace state to ensure search path matches leader's. */
    1473 GIC        1298 :     SetTempNamespaceState(fps->temp_namespace_id,
    1474                 :                           fps->temp_toast_namespace_id);
    1475                 : 
    1476                 :     /* Restore pending syncs. */
    1477            1298 :     pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
    1478                 :                                        false);
    1479 CBC        1298 :     RestorePendingSyncs(pendingsyncsspace);
    1480                 : 
    1481                 :     /* Restore reindex state. */
    1482            1298 :     reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
    1483 GIC        1298 :     RestoreReindexState(reindexspace);
    1484                 : 
    1485 ECB             :     /* Restore relmapper state. */
    1486 GIC        1298 :     relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
    1487            1298 :     RestoreRelationMap(relmapperspace);
    1488                 : 
    1489 ECB             :     /* Restore uncommitted enums. */
    1490 GIC        1298 :     uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
    1491 ECB             :                                            false);
    1492 GIC        1298 :     RestoreUncommittedEnums(uncommittedenumsspace);
    1493                 : 
    1494                 :     /* Restore the ClientConnectionInfo. */
    1495 GNC        1298 :     clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
    1496                 :                                          false);
    1497            1298 :     RestoreClientConnectionInfo(clientconninfospace);
    1498                 : 
    1499                 :     /*
    1500                 :      * Initialize SystemUser now that MyClientConnectionInfo is restored.
    1501                 :      * Also ensure that auth_method is actually valid, aka authn_id is not NULL.
    1502                 :      */
    1503            1298 :     if (MyClientConnectionInfo.authn_id)
    1504               4 :         InitializeSystemUser(MyClientConnectionInfo.authn_id,
    1505                 :                              hba_authname(MyClientConnectionInfo.auth_method));
    1506                 : 
    1507 ECB             :     /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
    1508 CBC        1298 :     AttachSerializableXact(fps->serializable_xact_handle);
    1509                 : 
    1510                 :     /*
    1511 ECB             :      * We've initialized all of our state now; nothing should change
    1512                 :      * hereafter.
    1513                 :      */
    1514 GIC        1298 :     InitializingParallelWorker = false;
    1515 CBC        1298 :     EnterParallelMode();
    1516                 : 
    1517 ECB             :     /*
    1518                 :      * Time to do the real work: invoke the caller-supplied code.
    1519                 :      */
    1520 CBC        1298 :     entrypt(seg, toc);
    1521                 : 
    1522 ECB             :     /* Must exit parallel mode to pop active snapshot. */
    1523 GIC        1295 :     ExitParallelMode();
    1524                 : 
    1525                 :     /* Must pop active snapshot so snapmgr.c doesn't complain. */
    1526            1295 :     PopActiveSnapshot();
    1527                 : 
    1528 ECB             :     /* Shut down the parallel-worker transaction. */
    1529 CBC        1295 :     EndParallelWorkerTransaction();
    1530                 : 
    1531                 :     /* Detach from the per-session DSM segment. */
    1532 GIC        1295 :     DetachSession();
    1533 ECB             : 
    1534                 :     /* Report success. */
    1535 GIC        1295 :     pq_putmessage('X', NULL, 0);
    1536                 : }
    1537                 : 
    1538                 : /*
    1539 ECB             :  * Update shared memory with the ending location of the last WAL record we
    1540                 :  * wrote, if it's greater than the value already stored there.
    1541                 :  */
    1542                 : void
    1543 GIC        1295 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
    1544                 : {
    1545 CBC        1295 :     FixedParallelState *fps = MyFixedParallelState;
    1546                 : 
    1547 GIC        1295 :     Assert(fps != NULL);
    1548 CBC        1295 :     SpinLockAcquire(&fps->mutex);
    1549 GIC        1295 :     if (fps->last_xlog_end < last_xlog_end)
    1550              58 :         fps->last_xlog_end = last_xlog_end;
    1551 CBC        1295 :     SpinLockRelease(&fps->mutex);
    1552 GIC        1295 : }
    1553                 : 
    1554 ECB             : /*
    1555                 :  * Make sure the leader tries to read from our error queue one more time.
    1556                 :  * This guards against the case where we exit uncleanly without sending an
    1557                 :  * ErrorResponse to the leader, for example because some code calls proc_exit
    1558                 :  * directly.
    1559                 :  *
    1560                 :  * Also explicitly detach from dsm segment so that subsystems using
    1561                 :  * on_dsm_detach() have a chance to send stats before the stats subsystem is
    1562                 :  * shut down as part of a before_shmem_exit() hook.
    1563                 :  *
    1564                 :  * One might think this could instead be solved by carefully ordering the
    1565                 :  * attaching to dsm segments, so that the pgstats segments get detached from
    1566                 :  * later than the parallel query one. That turns out to not work because the
    1567                 :  * stats hash might need to grow which can cause new segments to be allocated,
    1568                 :  * which then will be detached from earlier.
    1569                 :  */
    1570                 : static void
    1571 GIC        1298 : ParallelWorkerShutdown(int code, Datum arg)
    1572 ECB             : {
    1573 CBC        1298 :     SendProcSignal(ParallelLeaderPid,
    1574 ECB             :                    PROCSIG_PARALLEL_MESSAGE,
    1575                 :                    ParallelLeaderBackendId);
    1576                 : 
    1577 CBC        1298 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
    1578 GIC        1298 : }
    1579                 : 
    1580                 : /*
    1581                 :  * Look up (and possibly load) a parallel worker entry point function.
    1582                 :  *
    1583                 :  * For functions contained in the core code, we use library name "postgres"
    1584                 :  * and consult the InternalParallelWorkers array.  External functions are
    1585                 :  * looked up, and loaded if necessary, using load_external_function().
    1586                 :  *
    1587                 :  * The point of this is to pass function names as strings across process
    1588                 :  * boundaries.  We can't pass actual function addresses because of the
    1589                 :  * possibility that the function has been loaded at a different address
    1590                 :  * in a different process.  This is obviously a hazard for functions in
    1591                 :  * loadable libraries, but it can happen even for functions in the core code
    1592                 :  * on platforms using EXEC_BACKEND (e.g., Windows).
    1593                 :  *
    1594                 :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
    1595                 :  * in favor of applying load_external_function() for core functions too;
    1596 ECB             :  * but that raises portability issues that are not worth addressing now.
    1597                 :  */
    1598                 : static parallel_worker_main_type
    1599 GIC        1298 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
    1600                 : {
    1601                 :     /*
    1602 ECB             :      * If the function is to be loaded from postgres itself, search the
    1603                 :      * InternalParallelWorkers array.
    1604                 :      */
    1605 GIC        1298 :     if (strcmp(libraryname, "postgres") == 0)
    1606                 :     {
    1607                 :         int         i;
    1608                 : 
    1609            1403 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
    1610                 :         {
    1611            1403 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
    1612            1298 :                 return InternalParallelWorkers[i].fn_addr;
    1613                 :         }
    1614                 : 
    1615                 :         /* We can only reach this by programming error. */
    1616 UIC           0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
    1617                 :     }
    1618                 : 
    1619                 :     /* Otherwise load from external library. */
    1620               0 :     return (parallel_worker_main_type)
    1621               0 :         load_external_function(libraryname, funcname, true, NULL);
    1622                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a