LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 90.3 % 487 440 7 38 2 10 266 12 152 35 280 1
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 19 19 19 19
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 1 1 1
Legend: Lines: hit not hit (180,240] days: 100.0 % 11 11 11
(240..) days: 90.1 % 475 428 7 38 2 10 266 152 32 275
Function coverage date bins:
(240..) days: 51.4 % 37 19 19 18

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * parallel.c
                                  4                 :  *    Infrastructure for launching parallel workers
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  * IDENTIFICATION
                                 10                 :  *    src/backend/access/transam/parallel.c
                                 11                 :  *
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include "access/nbtree.h"
                                 18                 : #include "access/parallel.h"
                                 19                 : #include "access/session.h"
                                 20                 : #include "access/xact.h"
                                 21                 : #include "access/xlog.h"
                                 22                 : #include "catalog/index.h"
                                 23                 : #include "catalog/namespace.h"
                                 24                 : #include "catalog/pg_enum.h"
                                 25                 : #include "catalog/storage.h"
                                 26                 : #include "commands/async.h"
                                 27                 : #include "commands/vacuum.h"
                                 28                 : #include "executor/execParallel.h"
                                 29                 : #include "libpq/libpq.h"
                                 30                 : #include "libpq/pqformat.h"
                                 31                 : #include "libpq/pqmq.h"
                                 32                 : #include "miscadmin.h"
                                 33                 : #include "optimizer/optimizer.h"
                                 34                 : #include "pgstat.h"
                                 35                 : #include "storage/ipc.h"
                                 36                 : #include "storage/predicate.h"
                                 37                 : #include "storage/sinval.h"
                                 38                 : #include "storage/spin.h"
                                 39                 : #include "tcop/tcopprot.h"
                                 40                 : #include "utils/combocid.h"
                                 41                 : #include "utils/guc.h"
                                 42                 : #include "utils/inval.h"
                                 43                 : #include "utils/memutils.h"
                                 44                 : #include "utils/relmapper.h"
                                 45                 : #include "utils/snapmgr.h"
                                 46                 : #include "utils/typcache.h"
                                 47                 : 
                                 48                 : /*
                                 49                 :  * We don't want to waste a lot of memory on an error queue which, most of
                                 50                 :  * the time, will process only a handful of small messages.  However, it is
                                 51                 :  * desirable to make it large enough that a typical ErrorResponse can be sent
                                 52                 :  * without blocking.  That way, a worker that errors out can write the whole
                                 53                 :  * message into the queue and terminate without waiting for the user backend.
                                 54                 :  */
                                 55                 : #define PARALLEL_ERROR_QUEUE_SIZE           16384
                                 56                 : 
                                 57                 : /* Magic number for parallel context TOC. */
                                 58                 : #define PARALLEL_MAGIC                      0x50477c7c
                                 59                 : 
                                 60                 : /*
                                 61                 :  * Magic numbers for per-context parallel state sharing.  Higher-level code
                                 62                 :  * should use smaller values, leaving these very large ones for use by this
                                 63                 :  * module.
                                 64                 :  */
                                 65                 : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
                                 66                 : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
                                 67                 : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
                                 68                 : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
                                 69                 : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
                                 70                 : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
                                 71                 : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
                                 72                 : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
                                 73                 : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
                                 74                 : #define PARALLEL_KEY_SESSION_DSM            UINT64CONST(0xFFFFFFFFFFFF000A)
                                 75                 : #define PARALLEL_KEY_PENDING_SYNCS          UINT64CONST(0xFFFFFFFFFFFF000B)
                                 76                 : #define PARALLEL_KEY_REINDEX_STATE          UINT64CONST(0xFFFFFFFFFFFF000C)
                                 77                 : #define PARALLEL_KEY_RELMAPPER_STATE        UINT64CONST(0xFFFFFFFFFFFF000D)
                                 78                 : #define PARALLEL_KEY_UNCOMMITTEDENUMS       UINT64CONST(0xFFFFFFFFFFFF000E)
                                 79                 : #define PARALLEL_KEY_CLIENTCONNINFO         UINT64CONST(0xFFFFFFFFFFFF000F)
                                 80                 : 
                                 81                 : /* Fixed-size parallel state. */
                                 82                 : typedef struct FixedParallelState
                                 83                 : {
                                 84                 :     /* Fixed-size state that workers must restore. */
                                 85                 :     Oid         database_id;
                                 86                 :     Oid         authenticated_user_id;
                                 87                 :     Oid         current_user_id;
                                 88                 :     Oid         outer_user_id;
                                 89                 :     Oid         temp_namespace_id;
                                 90                 :     Oid         temp_toast_namespace_id;
                                 91                 :     int         sec_context;
                                 92                 :     bool        is_superuser;
                                 93                 :     PGPROC     *parallel_leader_pgproc;
                                 94                 :     pid_t       parallel_leader_pid;
                                 95                 :     BackendId   parallel_leader_backend_id;
                                 96                 :     TimestampTz xact_ts;
                                 97                 :     TimestampTz stmt_ts;
                                 98                 :     SerializableXactHandle serializable_xact_handle;
                                 99                 : 
                                100                 :     /* Mutex protects remaining fields. */
                                101                 :     slock_t     mutex;
                                102                 : 
                                103                 :     /* Maximum XactLastRecEnd of any worker. */
                                104                 :     XLogRecPtr  last_xlog_end;
                                105                 : } FixedParallelState;
                                106                 : 
                                107                 : /*
                                108                 :  * Our parallel worker number.  We initialize this to -1, meaning that we are
                                109                 :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
                                110                 :  * and < the number of workers before any user code is invoked; each parallel
                                111                 :  * worker will get a different parallel worker number.
                                112                 :  */
                                113                 : int         ParallelWorkerNumber = -1;
                                114                 : 
                                115                 : /* Is there a parallel message pending which we need to receive? */
                                116                 : volatile sig_atomic_t ParallelMessagePending = false;
                                117                 : 
                                118                 : /* Are we initializing a parallel worker? */
                                119                 : bool        InitializingParallelWorker = false;
                                120                 : 
                                121                 : /* Pointer to our fixed parallel state. */
                                122                 : static FixedParallelState *MyFixedParallelState;
                                123                 : 
                                124                 : /* List of active parallel contexts. */
                                125                 : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
                                126                 : 
                                127                 : /* Backend-local copy of data from FixedParallelState. */
                                128                 : static pid_t ParallelLeaderPid;
                                129                 : 
                                130                 : /*
                                131                 :  * List of internal parallel worker entry points.  We need this for
                                132                 :  * reasons explained in LookupParallelWorkerFunction(), below.
                                133                 :  */
                                134                 : static const struct
                                135                 : {
                                136                 :     const char *fn_name;
                                137                 :     parallel_worker_main_type fn_addr;
                                138                 : }           InternalParallelWorkers[] =
                                139                 : 
                                140                 : {
                                141                 :     {
                                142                 :         "ParallelQueryMain", ParallelQueryMain
                                143                 :     },
                                144                 :     {
                                145                 :         "_bt_parallel_build_main", _bt_parallel_build_main
                                146                 :     },
                                147                 :     {
                                148                 :         "parallel_vacuum_main", parallel_vacuum_main
                                149                 :     }
                                150                 : };
                                151                 : 
                                152                 : /* Private functions. */
                                153                 : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
                                154                 : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
                                155                 : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
                                156                 : static void ParallelWorkerShutdown(int code, Datum arg);
                                157                 : 
                                158                 : 
                                159                 : /*
                                160                 :  * Establish a new parallel context.  This should be done after entering
                                161                 :  * parallel mode, and (unless there is an error) the context should be
                                162                 :  * destroyed before exiting the current subtransaction.
                                163                 :  */
                                164                 : ParallelContext *
 2186 tgl                       165 GIC         403 : CreateParallelContext(const char *library_name, const char *function_name,
 1486 tmunro                    166 ECB             :                       int nworkers)
                                167                 : {
                                168                 :     MemoryContext oldcontext;
                                169                 :     ParallelContext *pcxt;
                                170                 : 
                                171                 :     /* It is unsafe to create a parallel context if not in parallel mode. */
 2901 rhaas                     172 GIC         403 :     Assert(IsInParallelMode());
 2901 rhaas                     173 ECB             : 
                                174                 :     /* Number of workers should be non-negative. */
 2901 rhaas                     175 GIC         403 :     Assert(nworkers >= 0);
 2901 rhaas                     176 ECB             : 
                                177                 :     /* We might be running in a short-lived memory context. */
 2901 rhaas                     178 GIC         403 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 2901 rhaas                     179 ECB             : 
                                180                 :     /* Initialize a new ParallelContext. */
 2901 rhaas                     181 GIC         403 :     pcxt = palloc0(sizeof(ParallelContext));
 2901 rhaas                     182 CBC         403 :     pcxt->subid = GetCurrentSubTransactionId();
                                183             403 :     pcxt->nworkers = nworkers;
 1175 akapila                   184             403 :     pcxt->nworkers_to_launch = nworkers;
 2186 tgl                       185             403 :     pcxt->library_name = pstrdup(library_name);
                                186             403 :     pcxt->function_name = pstrdup(function_name);
 2901 rhaas                     187             403 :     pcxt->error_context_stack = error_context_stack;
                                188             403 :     shm_toc_initialize_estimator(&pcxt->estimator);
                                189             403 :     dlist_push_head(&pcxt_list, &pcxt->node);
 2901 rhaas                     190 ECB             : 
                                191                 :     /* Restore previous memory context. */
 2901 rhaas                     192 GIC         403 :     MemoryContextSwitchTo(oldcontext);
 2901 rhaas                     193 ECB             : 
 2901 rhaas                     194 GIC         403 :     return pcxt;
 2901 rhaas                     195 ECB             : }
                                196                 : 
                                197                 : /*
                                198                 :  * Establish the dynamic shared memory segment for a parallel context and
                                199                 :  * copy state and other bookkeeping information that will be needed by
                                200                 :  * parallel workers into it.
                                201                 :  */
                                202                 : void
 2901 rhaas                     203 GIC         403 : InitializeParallelDSM(ParallelContext *pcxt)
 2901 rhaas                     204 ECB             : {
                                205                 :     MemoryContext oldcontext;
 2878 bruce                     206 GIC         403 :     Size        library_len = 0;
 2878 bruce                     207 CBC         403 :     Size        guc_len = 0;
                                208             403 :     Size        combocidlen = 0;
                                209             403 :     Size        tsnaplen = 0;
                                210             403 :     Size        asnaplen = 0;
                                211             403 :     Size        tstatelen = 0;
 1100 noah                      212             403 :     Size        pendingsyncslen = 0;
 1906 rhaas                     213             403 :     Size        reindexlen = 0;
 1703 pg                        214             403 :     Size        relmapperlen = 0;
  824 tmunro                    215             403 :     Size        uncommittedenumslen = 0;
  228 michael                   216 GNC         403 :     Size        clientconninfolen = 0;
 2878 bruce                     217 CBC         403 :     Size        segsize = 0;
 2878 bruce                     218 ECB             :     int         i;
 2901 rhaas                     219                 :     FixedParallelState *fps;
 2033 andres                    220 GIC         403 :     dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
 2901 rhaas                     221             403 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
 2901 rhaas                     222 CBC         403 :     Snapshot    active_snapshot = GetActiveSnapshot();
 2901 rhaas                     223 ECB             : 
                                224                 :     /* We might be running in a very short-lived memory context. */
 2901 rhaas                     225 GIC         403 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
                                226                 : 
 2901 rhaas                     227 ECB             :     /* Allow space to store the fixed-size parallel state. */
 2901 rhaas                     228 GIC         403 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
                                229             403 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
 2901 rhaas                     230 ECB             : 
                                231                 :     /*
                                232                 :      * Normally, the user will have requested at least one worker process, but
                                233                 :      * if by chance they have not, we can skip a bunch of things here.
                                234                 :      */
 2033 andres                    235 GIC         403 :     if (pcxt->nworkers > 0)
                                236                 :     {
 2033 andres                    237 ECB             :         /* Get (or create) the per-session DSM segment's handle. */
 2033 andres                    238 GIC         403 :         session_dsm_handle = GetSessionDsmHandle();
                                239                 : 
 2033 andres                    240 ECB             :         /*
                                241                 :          * If we weren't able to create a per-session DSM segment, then we can
                                242                 :          * continue but we can't safely launch any workers because their
                                243                 :          * record typmods would be incompatible so they couldn't exchange
                                244                 :          * tuples.
                                245                 :          */
 2033 andres                    246 GIC         403 :         if (session_dsm_handle == DSM_HANDLE_INVALID)
 2033 andres                    247 UIC           0 :             pcxt->nworkers = 0;
 2033 andres                    248 ECB             :     }
 2033 andres                    249 EUB             : 
 2901 rhaas                     250 GIC         403 :     if (pcxt->nworkers > 0)
                                251                 :     {
 2901 rhaas                     252 ECB             :         /* Estimate space for various kinds of state sharing. */
 2901 rhaas                     253 GIC         403 :         library_len = EstimateLibraryStateSpace();
                                254             403 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
 2901 rhaas                     255 CBC         403 :         guc_len = EstimateGUCStateSpace();
                                256             403 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
                                257             403 :         combocidlen = EstimateComboCIDStateSpace();
                                258             403 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
  592                           259             403 :         if (IsolationUsesXactSnapshot())
  592 rhaas                     260 ECB             :         {
  592 rhaas                     261 CBC          11 :             tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
  592 rhaas                     262 GIC          11 :             shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
  592 rhaas                     263 ECB             :         }
 2901 rhaas                     264 CBC         403 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
 2901 rhaas                     265 GIC         403 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
 2901 rhaas                     266 CBC         403 :         tstatelen = EstimateTransactionStateSpace();
                                267             403 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
 2033 andres                    268             403 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
 1100 noah                      269             403 :         pendingsyncslen = EstimatePendingSyncsSpace();
                                270             403 :         shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
 1906 rhaas                     271             403 :         reindexlen = EstimateReindexStateSpace();
                                272             403 :         shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
 1703 pg                        273             403 :         relmapperlen = EstimateRelationMapSpace();
                                274             403 :         shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
  824 tmunro                    275             403 :         uncommittedenumslen = EstimateUncommittedEnumsSpace();
                                276             403 :         shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
  228 michael                   277 GNC         403 :         clientconninfolen = EstimateClientConnectionInfoSpace();
                                278             403 :         shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
 2901 rhaas                     279 ECB             :         /* If you add more chunks here, you probably need to add keys. */
  228 michael                   280 GNC         403 :         shm_toc_estimate_keys(&pcxt->estimator, 12);
 2901 rhaas                     281 ECB             : 
                                282                 :         /* Estimate space need for error queues. */
                                283                 :         StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
 2878 bruce                     284                 :                          PARALLEL_ERROR_QUEUE_SIZE,
                                285                 :                          "parallel error queue size not buffer-aligned");
 2901 rhaas                     286 GIC         403 :         shm_toc_estimate_chunk(&pcxt->estimator,
                                287                 :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
                                288                 :                                         pcxt->nworkers));
                                289             403 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
 2901 rhaas                     290 ECB             : 
                                291                 :         /* Estimate how much we'll need for the entrypoint info. */
 2186 tgl                       292 GIC         403 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
 2186 tgl                       293 ECB             :                                strlen(pcxt->function_name) + 2);
 2186 tgl                       294 GIC         403 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
                                295                 :     }
 2901 rhaas                     296 ECB             : 
                                297                 :     /*
                                298                 :      * Create DSM and initialize with new table of contents.  But if the user
                                299                 :      * didn't request any workers, then don't bother creating a dynamic shared
                                300                 :      * memory segment; instead, just use backend-private memory.
                                301                 :      *
                                302                 :      * Also, if we can't create a dynamic shared memory segment because the
                                303                 :      * maximum number of segments have already been created, then fall back to
                                304                 :      * backend-private memory, and plan not to use any workers.  We hope this
                                305                 :      * won't happen very often, but it's better to abandon the use of
                                306                 :      * parallelism than to fail outright.
                                307                 :      */
 2901 rhaas                     308 GIC         403 :     segsize = shm_toc_estimate(&pcxt->estimator);
 2488 tgl                       309             403 :     if (pcxt->nworkers > 0)
 2901 rhaas                     310             403 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
                                311             403 :     if (pcxt->seg != NULL)
 2901 rhaas                     312 CBC         403 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
 2901 rhaas                     313 ECB             :                                    dsm_segment_address(pcxt->seg),
                                314                 :                                    segsize);
                                315                 :     else
                                316                 :     {
 2901 rhaas                     317 UIC           0 :         pcxt->nworkers = 0;
 2896                           318               0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
                                319               0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
                                320                 :                                    segsize);
 2901 rhaas                     321 EUB             :     }
                                322                 : 
                                323                 :     /* Initialize fixed-size state in shared memory. */
                                324                 :     fps = (FixedParallelState *)
 2901 rhaas                     325 GIC         403 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
                                326             403 :     fps->database_id = MyDatabaseId;
                                327             403 :     fps->authenticated_user_id = GetAuthenticatedUserId();
 1988                           328             403 :     fps->outer_user_id = GetCurrentRoleId();
 1988 rhaas                     329 CBC         403 :     fps->is_superuser = session_auth_is_superuser;
 2901                           330             403 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
 2495 tgl                       331             403 :     GetTempNamespaceState(&fps->temp_namespace_id,
 2495 tgl                       332 ECB             :                           &fps->temp_toast_namespace_id);
 1029 andres                    333 CBC         403 :     fps->parallel_leader_pgproc = MyProc;
                                334             403 :     fps->parallel_leader_pid = MyProcPid;
                                335             403 :     fps->parallel_leader_backend_id = MyBackendId;
 1646 tgl                       336 GIC         403 :     fps->xact_ts = GetCurrentTransactionStartTimestamp();
 1646 tgl                       337 CBC         403 :     fps->stmt_ts = GetCurrentStatementStartTimestamp();
 1486 tmunro                    338             403 :     fps->serializable_xact_handle = ShareSerializableXact();
 2901 rhaas                     339             403 :     SpinLockInit(&fps->mutex);
                                340             403 :     fps->last_xlog_end = 0;
                                341             403 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
 2901 rhaas                     342 ECB             : 
                                343                 :     /* We can skip the rest of this if we're not budgeting for any workers. */
 2901 rhaas                     344 CBC         403 :     if (pcxt->nworkers > 0)
 2901 rhaas                     345 ECB             :     {
                                346                 :         char       *libraryspace;
                                347                 :         char       *gucspace;
 2878 bruce                     348                 :         char       *combocidspace;
                                349                 :         char       *tsnapspace;
                                350                 :         char       *asnapspace;
                                351                 :         char       *tstatespace;
                                352                 :         char       *pendingsyncsspace;
                                353                 :         char       *reindexspace;
                                354                 :         char       *relmapperspace;
                                355                 :         char       *error_queue_space;
                                356                 :         char       *session_dsm_handle_space;
                                357                 :         char       *entrypointstate;
                                358                 :         char       *uncommittedenumsspace;
                                359                 :         char       *clientconninfospace;
                                360                 :         Size        lnamelen;
                                361                 : 
                                362                 :         /* Serialize shared libraries we have loaded. */
 2901 rhaas                     363 GIC         403 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
                                364             403 :         SerializeLibraryState(library_len, libraryspace);
                                365             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
                                366                 : 
                                367                 :         /* Serialize GUC settings. */
 2901 rhaas                     368 CBC         403 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
                                369             403 :         SerializeGUCState(guc_len, gucspace);
                                370             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
                                371                 : 
                                372                 :         /* Serialize combo CID state. */
                                373             403 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
                                374             403 :         SerializeComboCIDState(combocidlen, combocidspace);
                                375             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
                                376                 : 
                                377                 :         /*
  592 rhaas                     378 ECB             :          * Serialize the transaction snapshot if the transaction
                                379                 :          * isolation-level uses a transaction snapshot.
                                380                 :          */
  592 rhaas                     381 GIC         403 :         if (IsolationUsesXactSnapshot())
                                382                 :         {
                                383              11 :             tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
                                384              11 :             SerializeSnapshot(transaction_snapshot, tsnapspace);
                                385              11 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
  592 rhaas                     386 ECB             :                            tsnapspace);
                                387                 :         }
                                388                 : 
                                389                 :         /* Serialize the active snapshot. */
 2901 rhaas                     390 CBC         403 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
 2901 rhaas                     391 GIC         403 :         SerializeSnapshot(active_snapshot, asnapspace);
                                392             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
                                393                 : 
                                394                 :         /* Provide the handle for per-session segment. */
 2033 andres                    395 CBC         403 :         session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
 2033 andres                    396 ECB             :                                                     sizeof(dsm_handle));
 2033 andres                    397 CBC         403 :         *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
 2033 andres                    398 GIC         403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
                                399                 :                        session_dsm_handle_space);
 2033 andres                    400 ECB             : 
                                401                 :         /* Serialize transaction state. */
 2901 rhaas                     402 CBC         403 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
                                403             403 :         SerializeTransactionState(tstatelen, tstatespace);
 2901 rhaas                     404 GIC         403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
                                405                 : 
                                406                 :         /* Serialize pending syncs. */
 1100 noah                      407 CBC         403 :         pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
                                408             403 :         SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
                                409             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
                                410                 :                        pendingsyncsspace);
                                411                 : 
 1906 rhaas                     412 ECB             :         /* Serialize reindex state. */
 1906 rhaas                     413 CBC         403 :         reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
                                414             403 :         SerializeReindexState(reindexlen, reindexspace);
 1906 rhaas                     415 GIC         403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
                                416                 : 
                                417                 :         /* Serialize relmapper state. */
 1703 pg                        418 CBC         403 :         relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
                                419             403 :         SerializeRelationMap(relmapperlen, relmapperspace);
                                420             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
                                421                 :                        relmapperspace);
                                422                 : 
  824 tmunro                    423 ECB             :         /* Serialize uncommitted enum state. */
  824 tmunro                    424 CBC         403 :         uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
  824 tmunro                    425 ECB             :                                                  uncommittedenumslen);
  824 tmunro                    426 GIC         403 :         SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
                                427             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
                                428                 :                        uncommittedenumsspace);
 1643 tmunro                    429 ECB             : 
                                430                 :         /* Serialize our ClientConnectionInfo. */
  228 michael                   431 GNC         403 :         clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
                                432             403 :         SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
                                433             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
                                434                 :                        clientconninfospace);
                                435                 : 
                                436                 :         /* Allocate space for worker information. */
 2901 rhaas                     437 CBC         403 :         pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
 2901 rhaas                     438 ECB             : 
                                439                 :         /*
                                440                 :          * Establish error queues in dynamic shared memory.
                                441                 :          *
                                442                 :          * These queues should be used only for transmitting ErrorResponse,
                                443                 :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
                                444                 :          * should be transmitted via separate (possibly larger?) queues.
                                445                 :          */
                                446                 :         error_queue_space =
 2878 bruce                     447 GIC         403 :             shm_toc_allocate(pcxt->toc,
 2529 rhaas                     448 ECB             :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
 2529 rhaas                     449 GIC         403 :                                       pcxt->nworkers));
 2901                           450            1326 :         for (i = 0; i < pcxt->nworkers; ++i)
                                451                 :         {
                                452                 :             char       *start;
                                453                 :             shm_mq     *mq;
                                454                 : 
                                455             923 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
                                456             923 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
                                457             923 :             shm_mq_set_receiver(mq, MyProc);
 2901 rhaas                     458 CBC         923 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
                                459                 :         }
                                460             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
 2901 rhaas                     461 ECB             : 
                                462                 :         /*
                                463                 :          * Serialize entrypoint information.  It's unsafe to pass function
                                464                 :          * pointers across processes, as the function pointer may be different
                                465                 :          * in each process in EXEC_BACKEND builds, so we always pass library
 2186 tgl                       466                 :          * and function name.  (We use library name "postgres" for functions
                                467                 :          * in the core backend.)
                                468                 :          */
 2186 tgl                       469 CBC         403 :         lnamelen = strlen(pcxt->library_name);
 2186 tgl                       470 GIC         403 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
 2186 tgl                       471 CBC         403 :                                            strlen(pcxt->function_name) + 2);
 2186 tgl                       472 GIC         403 :         strcpy(entrypointstate, pcxt->library_name);
                                473             403 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
                                474             403 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
                                475                 :     }
                                476                 : 
                                477                 :     /* Restore previous memory context. */
 2901 rhaas                     478             403 :     MemoryContextSwitchTo(oldcontext);
                                479             403 : }
 2901 rhaas                     480 ECB             : 
 2718                           481                 : /*
                                482                 :  * Reinitialize the dynamic shared memory segment for a parallel context such
                                483                 :  * that we could launch workers for it again.
                                484                 :  */
                                485                 : void
 2718 rhaas                     486 GIC         131 : ReinitializeParallelDSM(ParallelContext *pcxt)
                                487                 : {
                                488                 :     FixedParallelState *fps;
 2718 rhaas                     489 ECB             : 
 2488 tgl                       490                 :     /* Wait for any old workers to exit. */
 2488 tgl                       491 GIC         131 :     if (pcxt->nworkers_launched > 0)
                                492                 :     {
                                493             131 :         WaitForParallelWorkersToFinish(pcxt);
                                494             131 :         WaitForParallelWorkersToExit(pcxt);
                                495             131 :         pcxt->nworkers_launched = 0;
 1892 rhaas                     496             131 :         if (pcxt->known_attached_workers)
 1902 rhaas                     497 ECB             :         {
 1892 rhaas                     498 GIC         131 :             pfree(pcxt->known_attached_workers);
                                499             131 :             pcxt->known_attached_workers = NULL;
                                500             131 :             pcxt->nknown_attached_workers = 0;
                                501                 :         }
 2488 tgl                       502 ECB             :     }
                                503                 : 
 2718 rhaas                     504                 :     /* Reset a few bits of fixed parallel state to a clean state. */
 2134 tgl                       505 CBC         131 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
 2718 rhaas                     506             131 :     fps->last_xlog_end = 0;
 2718 rhaas                     507 ECB             : 
                                508                 :     /* Recreate error queues (if they exist). */
 1892 tgl                       509 CBC         131 :     if (pcxt->nworkers > 0)
 2718 rhaas                     510 ECB             :     {
 1892 tgl                       511                 :         char       *error_queue_space;
                                512                 :         int         i;
                                513                 : 
                                514                 :         error_queue_space =
 1892 tgl                       515 GIC         131 :             shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
 1892 tgl                       516 CBC         546 :         for (i = 0; i < pcxt->nworkers; ++i)
 1892 tgl                       517 ECB             :         {
                                518                 :             char       *start;
                                519                 :             shm_mq     *mq;
 2718 rhaas                     520                 : 
 1892 tgl                       521 GIC         415 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
                                522             415 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
                                523             415 :             shm_mq_set_receiver(mq, MyProc);
                                524             415 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
                                525                 :         }
 2718 rhaas                     526 ECB             :     }
 2718 rhaas                     527 CBC         131 : }
                                528                 : 
                                529                 : /*
                                530                 :  * Reinitialize parallel workers for a parallel context such that we could
                                531                 :  * launch a different number of workers.  This is required for cases where
 1175 akapila                   532 ECB             :  * we need to reuse the same DSM segment, but the number of workers can
                                533                 :  * vary from run-to-run.
                                534                 :  */
                                535                 : void
 1175 akapila                   536 GIC          11 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
                                537                 : {
 1175 akapila                   538 ECB             :     /*
                                539                 :      * The number of workers that need to be launched must be less than the
                                540                 :      * number of workers with which the parallel context is initialized.
                                541                 :      */
 1175 akapila                   542 GIC          11 :     Assert(pcxt->nworkers >= nworkers_to_launch);
                                543              11 :     pcxt->nworkers_to_launch = nworkers_to_launch;
                                544              11 : }
                                545                 : 
                                546                 : /*
 2901 rhaas                     547 ECB             :  * Launch parallel workers.
                                548                 :  */
                                549                 : void
 2901 rhaas                     550 GIC         534 : LaunchParallelWorkers(ParallelContext *pcxt)
                                551                 : {
                                552                 :     MemoryContext oldcontext;
 2878 bruce                     553 ECB             :     BackgroundWorker worker;
                                554                 :     int         i;
 2878 bruce                     555 CBC         534 :     bool        any_registrations_failed = false;
                                556                 : 
                                557                 :     /* Skip this if we have no workers. */
 1175 akapila                   558 GIC         534 :     if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
 2901 rhaas                     559 UIC           0 :         return;
                                560                 : 
 2618 rhaas                     561 ECB             :     /* We need to be a lock group leader. */
 2618 rhaas                     562 GIC         534 :     BecomeLockGroupLeader();
                                563                 : 
                                564                 :     /* If we do have workers, we'd better have a DSM segment. */
 2901                           565             534 :     Assert(pcxt->seg != NULL);
 2901 rhaas                     566 ECB             : 
                                567                 :     /* We might be running in a short-lived memory context. */
 2901 rhaas                     568 GIC         534 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 2901 rhaas                     569 ECB             : 
 2901 rhaas                     570 EUB             :     /* Configure a worker. */
 2184 tgl                       571 GIC         534 :     memset(&worker, 0, sizeof(worker));
 2901 rhaas                     572             534 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
 2901 rhaas                     573 ECB             :              MyProcPid);
 2047 peter_e                   574 GIC         534 :     snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
 2901 rhaas                     575             534 :     worker.bgw_flags =
 2319 rhaas                     576 ECB             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
                                577                 :         | BGWORKER_CLASS_PARALLEL;
 2901 rhaas                     578 GIC         534 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
 2901 rhaas                     579 CBC         534 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
 2200 rhaas                     580 GIC         534 :     sprintf(worker.bgw_library_name, "postgres");
                                581             534 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
 2901 rhaas                     582 CBC         534 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
                                583             534 :     worker.bgw_notify_pid = MyProcPid;
                                584                 : 
 2901 rhaas                     585 ECB             :     /*
                                586                 :      * Start workers.
                                587                 :      *
                                588                 :      * The caller must be able to tolerate ending up with fewer workers than
                                589                 :      * expected, so there is no need to throw an error here if registration
 2878 bruce                     590                 :      * fails.  It wouldn't help much anyway, because registering the worker in
                                591                 :      * no way guarantees that it will start up and initialize successfully.
 2901 rhaas                     592                 :      */
 1175 akapila                   593 CBC        1870 :     for (i = 0; i < pcxt->nworkers_to_launch; ++i)
 2901 rhaas                     594 ECB             :     {
 2712 rhaas                     595 GIC        1336 :         memcpy(worker.bgw_extra, &i, sizeof(int));
 2901                           596            2645 :         if (!any_registrations_failed &&
                                597            1309 :             RegisterDynamicBackgroundWorker(&worker,
                                598            1309 :                                             &pcxt->worker[i].bgwhandle))
                                599                 :         {
                                600            1298 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
                                601            1298 :                               pcxt->worker[i].bgwhandle);
 2732                           602            1298 :             pcxt->nworkers_launched++;
                                603                 :         }
 2901 rhaas                     604 ECB             :         else
                                605                 :         {
                                606                 :             /*
 2878 bruce                     607                 :              * If we weren't able to register the worker, then we've bumped up
                                608                 :              * against the max_worker_processes limit, and future
 2901 rhaas                     609                 :              * registrations will probably fail too, so arrange to skip them.
                                610                 :              * But we still have to execute this code for the remaining slots
                                611                 :              * to make sure that we forget about the error queues we budgeted
                                612                 :              * for those workers.  Otherwise, we'll wait for them to start,
                                613                 :              * but they never will.
                                614                 :              */
 2901 rhaas                     615 GIC          38 :             any_registrations_failed = true;
                                616              38 :             pcxt->worker[i].bgwhandle = NULL;
 2047 tgl                       617              38 :             shm_mq_detach(pcxt->worker[i].error_mqh);
 2901 rhaas                     618              38 :             pcxt->worker[i].error_mqh = NULL;
                                619                 :         }
                                620                 :     }
                                621                 : 
                                622                 :     /*
                                623                 :      * Now that nworkers_launched has taken its final value, we can initialize
                                624                 :      * known_attached_workers.
                                625                 :      */
 1902 rhaas                     626 CBC         534 :     if (pcxt->nworkers_launched > 0)
 1892 rhaas                     627 ECB             :     {
 1892 rhaas                     628 CBC         525 :         pcxt->known_attached_workers =
 1902                           629             525 :             palloc0(sizeof(bool) * pcxt->nworkers_launched);
 1892 rhaas                     630 GIC         525 :         pcxt->nknown_attached_workers = 0;
                                631                 :     }
                                632                 : 
                                633                 :     /* Restore previous memory context. */
 2901                           634             534 :     MemoryContextSwitchTo(oldcontext);
                                635                 : }
                                636                 : 
 1892 rhaas                     637 ECB             : /*
                                638                 :  * Wait for all workers to attach to their error queues, and throw an error if
                                639                 :  * any worker fails to do this.
                                640                 :  *
                                641                 :  * Callers can assume that if this function returns successfully, then the
                                642                 :  * number of workers given by pcxt->nworkers_launched have initialized and
                                643                 :  * attached to their error queues.  Whether or not these workers are guaranteed
                                644                 :  * to still be running depends on what code the caller asked them to run;
                                645                 :  * this function does not guarantee that they have not exited.  However, it
                                646                 :  * does guarantee that any workers which exited must have done so cleanly and
                                647                 :  * after successfully performing the work with which they were tasked.
                                648                 :  *
                                649                 :  * If this function is not called, then some of the workers that were launched
                                650                 :  * may not have been started due to a fork() failure, or may have exited during
                                651                 :  * early startup prior to attaching to the error queue, so nworkers_launched
                                652                 :  * cannot be viewed as completely reliable.  It will never be less than the
                                653                 :  * number of workers which actually started, but it might be more.  Any workers
                                654                 :  * that failed to start will still be discovered by
                                655                 :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
                                656                 :  * provided that function is eventually reached.
                                657                 :  *
                                658                 :  * In general, the leader process should do as much work as possible before
                                659                 :  * calling this function.  fork() failures and other early-startup failures
                                660                 :  * are very uncommon, and having the leader sit idle when it could be doing
                                661                 :  * useful work is undesirable.  However, if the leader needs to wait for
                                662                 :  * all of its workers or for a specific worker, it may want to call this
                                663                 :  * function before doing so.  If not, it must make some other provision for
                                664                 :  * the failure-to-start case, lest it wait forever.  On the other hand, a
                                665                 :  * leader which never waits for a worker that might not be started yet, or
                                666                 :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
                                667                 :  * call this function at all.
                                668                 :  */
                                669                 : void
 1892 rhaas                     670 GIC          71 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
                                671                 : {
                                672                 :     int         i;
                                673                 : 
                                674                 :     /* Skip this if we have no launched workers. */
                                675              71 :     if (pcxt->nworkers_launched == 0)
 1892 rhaas                     676 UIC           0 :         return;
                                677                 : 
                                678                 :     for (;;)
                                679                 :     {
                                680                 :         /*
 1892 rhaas                     681 ECB             :          * This will process any parallel messages that are pending and it may
                                682                 :          * also throw an error propagated from a worker.
                                683                 :          */
 1892 rhaas                     684 GIC     4193812 :         CHECK_FOR_INTERRUPTS();
                                685                 : 
 1892 rhaas                     686 CBC     8387624 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
 1892 rhaas                     687 EUB             :         {
                                688                 :             BgwHandleStatus status;
                                689                 :             shm_mq     *mq;
                                690                 :             int         rc;
                                691                 :             pid_t       pid;
                                692                 : 
 1892 rhaas                     693 GIC     4193812 :             if (pcxt->known_attached_workers[i])
                                694              14 :                 continue;
 1892 rhaas                     695 ECB             : 
                                696                 :             /*
                                697                 :              * If error_mqh is NULL, then the worker has already exited
                                698                 :              * cleanly.
                                699                 :              */
 1892 rhaas                     700 GIC     4193798 :             if (pcxt->worker[i].error_mqh == NULL)
                                701                 :             {
 1892 rhaas                     702 UIC           0 :                 pcxt->known_attached_workers[i] = true;
                                703               0 :                 ++pcxt->nknown_attached_workers;
 1892 rhaas                     704 LBC           0 :                 continue;
 1892 rhaas                     705 ECB             :             }
                                706                 : 
 1892 rhaas                     707 GIC     4193798 :             status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
                                708         4193798 :             if (status == BGWH_STARTED)
                                709                 :             {
                                710                 :                 /* Has the worker attached to the error queue? */
 1892 rhaas                     711 CBC     4193742 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
 1892 rhaas                     712 GIC     4193742 :                 if (shm_mq_get_sender(mq) != NULL)
 1892 rhaas                     713 EUB             :                 {
                                714                 :                     /* Yes, so it is known to be attached. */
 1892 rhaas                     715 GBC          57 :                     pcxt->known_attached_workers[i] = true;
 1892 rhaas                     716 GIC          57 :                     ++pcxt->nknown_attached_workers;
                                717                 :                 }
 1892 rhaas                     718 ECB             :             }
 1892 rhaas                     719 CBC          56 :             else if (status == BGWH_STOPPED)
                                720                 :             {
                                721                 :                 /*
 1892 rhaas                     722 ECB             :                  * If the worker stopped without attaching to the error queue,
                                723                 :                  * throw an error.
                                724                 :                  */
 1892 rhaas                     725 UIC           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
 1892 rhaas                     726 LBC           0 :                 if (shm_mq_get_sender(mq) == NULL)
                                727               0 :                     ereport(ERROR,
                                728                 :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                729                 :                              errmsg("parallel worker failed to initialize"),
 1892 rhaas                     730 ECB             :                              errhint("More details may be available in the server log.")));
                                731                 : 
 1892 rhaas                     732 UIC           0 :                 pcxt->known_attached_workers[i] = true;
                                733               0 :                 ++pcxt->nknown_attached_workers;
                                734                 :             }
                                735                 :             else
 1892 rhaas                     736 EUB             :             {
                                737                 :                 /*
                                738                 :                  * Worker not yet started, so we must wait.  The postmaster
                                739                 :                  * will notify us if the worker's state changes.  Our latch
                                740                 :                  * might also get set for some other reason, but if so we'll
                                741                 :                  * just end up waiting for the same worker again.
                                742                 :                  */
 1892 rhaas                     743 GBC          56 :                 rc = WaitLatch(MyLatch,
 1598 tmunro                    744 EUB             :                                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
                                745                 :                                -1, WAIT_EVENT_BGWORKER_STARTUP);
                                746                 : 
 1892 rhaas                     747 GIC          56 :                 if (rc & WL_LATCH_SET)
                                748              56 :                     ResetLatch(MyLatch);
                                749                 :             }
                                750                 :         }
                                751                 : 
                                752                 :         /* If all workers are known to have started, we're done. */
                                753         4193812 :         if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
 1892 rhaas                     754 ECB             :         {
 1892 rhaas                     755 GIC          71 :             Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
                                756              71 :             break;
                                757                 :         }
 1892 rhaas                     758 ECB             :     }
                                759                 : }
                                760                 : 
                                761                 : /*
                                762                 :  * Wait for all workers to finish computing.
                                763                 :  *
 2901                           764                 :  * Even if the parallel operation seems to have completed successfully, it's
                                765                 :  * important to call this function afterwards.  We must not miss any errors
                                766                 :  * the workers may have thrown during the parallel operation, or any that they
                                767                 :  * may yet throw while shutting down.
                                768                 :  *
                                769                 :  * Also, we want to update our notion of XactLastRecEnd based on worker
                                770                 :  * feedback.
                                771                 :  */
                                772                 : void
 2901 rhaas                     773 GIC         662 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                                774                 : {
                                775                 :     for (;;)
                                776             569 :     {
 2878 bruce                     777            1231 :         bool        anyone_alive = false;
 1902 rhaas                     778            1231 :         int         nfinished = 0;
                                779                 :         int         i;
                                780                 : 
                                781                 :         /*
                                782                 :          * This will process any parallel messages that are pending, which may
                                783                 :          * change the outcome of the loop that follows.  It may also throw an
 2878 bruce                     784 ECB             :          * error propagated from a worker.
                                785                 :          */
 2901 rhaas                     786 GIC        1231 :         CHECK_FOR_INTERRUPTS();
 2901 rhaas                     787 ECB             : 
 2592 rhaas                     788 CBC        3281 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
 2901 rhaas                     789 ECB             :         {
                                790                 :             /*
                                791                 :              * If error_mqh is NULL, then the worker has already exited
                                792                 :              * cleanly.  If we have received a message through error_mqh from
                                793                 :              * the worker, we know it started up cleanly, and therefore we're
                                794                 :              * certain to be notified when it exits.
                                795                 :              */
 1902 rhaas                     796 GIC        2591 :             if (pcxt->worker[i].error_mqh == NULL)
 1902 rhaas                     797 CBC        1998 :                 ++nfinished;
 1892 rhaas                     798 GIC         593 :             else if (pcxt->known_attached_workers[i])
 2901 rhaas                     799 ECB             :             {
 2901 rhaas                     800 GIC         541 :                 anyone_alive = true;
                                801             541 :                 break;
                                802                 :             }
                                803                 :         }
                                804                 : 
                                805            1231 :         if (!anyone_alive)
                                806                 :         {
 1902 rhaas                     807 ECB             :             /* If all workers are known to have finished, we're done. */
 1902 rhaas                     808 CBC         690 :             if (nfinished >= pcxt->nworkers_launched)
 1902 rhaas                     809 ECB             :             {
 1902 rhaas                     810 GIC         662 :                 Assert(nfinished == pcxt->nworkers_launched);
 1902 rhaas                     811 CBC         662 :                 break;
 1902 rhaas                     812 ECB             :             }
                                813                 : 
                                814                 :             /*
                                815                 :              * We didn't detect any living workers, but not all workers are
                                816                 :              * known to have exited cleanly.  Either not all workers have
                                817                 :              * launched yet, or maybe some of them failed to start or
                                818                 :              * terminated abnormally.
                                819                 :              */
 1902 rhaas                     820 GIC          76 :             for (i = 0; i < pcxt->nworkers_launched; ++i)
 1902 rhaas                     821 ECB             :             {
                                822                 :                 pid_t       pid;
                                823                 :                 shm_mq     *mq;
                                824                 : 
                                825                 :                 /*
                                826                 :                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
                                827                 :                  * should just keep waiting.  If it is BGWH_STOPPED, then
                                828                 :                  * further investigation is needed.
                                829                 :                  */
 1902 rhaas                     830 GIC          48 :                 if (pcxt->worker[i].error_mqh == NULL ||
 1902 rhaas                     831 CBC          92 :                     pcxt->worker[i].bgwhandle == NULL ||
 1902 rhaas                     832 GIC          46 :                     GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
                                833                 :                                            &pid) != BGWH_STOPPED)
                                834              48 :                     continue;
                                835                 : 
                                836                 :                 /*
                                837                 :                  * Check whether the worker ended up stopped without ever
                                838                 :                  * attaching to the error queue.  If so, the postmaster was
                                839                 :                  * unable to fork the worker or it exited without initializing
                                840                 :                  * properly.  We must throw an error, since the caller may
 1902 rhaas                     841 ECB             :                  * have been expecting the worker to do some work before
                                842                 :                  * exiting.
                                843                 :                  */
 1902 rhaas                     844 UIC           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
 1902 rhaas                     845 LBC           0 :                 if (shm_mq_get_sender(mq) == NULL)
 1902 rhaas                     846 UIC           0 :                     ereport(ERROR,
                                847                 :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                848                 :                              errmsg("parallel worker failed to initialize"),
                                849                 :                              errhint("More details may be available in the server log.")));
                                850                 : 
                                851                 :                 /*
                                852                 :                  * The worker is stopped, but is attached to the error queue.
                                853                 :                  * Unless there's a bug somewhere, this will only happen when
                                854                 :                  * the worker writes messages and terminates after the
 1902 rhaas                     855 EUB             :                  * CHECK_FOR_INTERRUPTS() near the top of this function and
                                856                 :                  * before the call to GetBackgroundWorkerPid().  In that case,
                                857                 :                  * or latch should have been set as well and the right things
                                858                 :                  * will happen on the next pass through the loop.
                                859                 :                  */
                                860                 :             }
                                861                 :         }
                                862                 : 
 1598 tmunro                    863 GIC         569 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
                                864                 :                          WAIT_EVENT_PARALLEL_FINISH);
 2133 andres                    865             569 :         ResetLatch(MyLatch);
                                866                 :     }
                                867                 : 
 2901 rhaas                     868             662 :     if (pcxt->toc != NULL)
                                869                 :     {
                                870                 :         FixedParallelState *fps;
                                871                 : 
 2134 tgl                       872             662 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
 2901 rhaas                     873             662 :         if (fps->last_xlog_end > XactLastRecEnd)
 2901 rhaas                     874 CBC          10 :             XactLastRecEnd = fps->last_xlog_end;
                                875                 :     }
                                876             662 : }
                                877                 : 
                                878                 : /*
 2718 rhaas                     879 ECB             :  * Wait for all workers to exit.
                                880                 :  *
                                881                 :  * This function ensures that workers have been completely shutdown.  The
                                882                 :  * difference between WaitForParallelWorkersToFinish and this function is
 1029 andres                    883                 :  * that the former just ensures that last message sent by a worker backend is
                                884                 :  * received by the leader backend whereas this ensures the complete shutdown.
 2718 rhaas                     885                 :  */
                                886                 : static void
 2718 rhaas                     887 CBC         534 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
                                888                 : {
                                889                 :     int         i;
                                890                 : 
                                891                 :     /* Wait until the workers actually die. */
 2592 rhaas                     892 GIC        1832 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
                                893                 :     {
                                894                 :         BgwHandleStatus status;
                                895                 : 
 2718                           896            1298 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
 2718 rhaas                     897 UIC           0 :             continue;
 2718 rhaas                     898 ECB             : 
 2718 rhaas                     899 GIC        1298 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
                                900                 : 
                                901                 :         /*
                                902                 :          * If the postmaster kicked the bucket, we have no chance of cleaning
 2718 rhaas                     903 ECB             :          * up safely -- we won't be able to tell when our workers are actually
                                904                 :          * dead.  This doesn't necessitate a PANIC since they will all abort
                                905                 :          * eventually, but we can't safely continue this session.
                                906                 :          */
 2718 rhaas                     907 CBC        1298 :         if (status == BGWH_POSTMASTER_DIED)
 2718 rhaas                     908 UBC           0 :             ereport(FATAL,
                                909                 :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
 2118 tgl                       910 ECB             :                      errmsg("postmaster exited during a parallel transaction")));
                                911                 : 
                                912                 :         /* Release memory. */
 2718 rhaas                     913 GIC        1298 :         pfree(pcxt->worker[i].bgwhandle);
                                914            1298 :         pcxt->worker[i].bgwhandle = NULL;
                                915                 :     }
                                916             534 : }
                                917                 : 
 2901 rhaas                     918 ECB             : /*
 2901 rhaas                     919 EUB             :  * Destroy a parallel context.
                                920                 :  *
                                921                 :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
                                922                 :  * first, before calling this function.  When this function is invoked, any
                                923                 :  * remaining workers are forcibly killed; the dynamic shared memory segment
 2901 rhaas                     924 ECB             :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
                                925                 :  */
                                926                 : void
 2901 rhaas                     927 CBC         403 : DestroyParallelContext(ParallelContext *pcxt)
                                928                 : {
                                929                 :     int         i;
                                930                 : 
                                931                 :     /*
                                932                 :      * Be careful about order of operations here!  We remove the parallel
                                933                 :      * context from the list before we do anything else; otherwise, if an
                                934                 :      * error occurs during a subsequent step, we might try to nuke it again
                                935                 :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
                                936                 :      */
 2901 rhaas                     937 GIC         403 :     dlist_delete(&pcxt->node);
 2901 rhaas                     938 ECB             : 
                                939                 :     /* Kill each worker in turn, and forget their error queues. */
 2748 rhaas                     940 GIC         403 :     if (pcxt->worker != NULL)
                                941                 :     {
 2592                           942            1286 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
                                943                 :         {
 2748                           944             883 :             if (pcxt->worker[i].error_mqh != NULL)
                                945                 :             {
 2718                           946               3 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
                                947                 : 
 2047 tgl                       948 CBC           3 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
 2748 rhaas                     949 GIC           3 :                 pcxt->worker[i].error_mqh = NULL;
                                950                 :             }
 2901 rhaas                     951 ECB             :         }
                                952                 :     }
                                953                 : 
                                954                 :     /*
                                955                 :      * If we have allocated a shared memory segment, detach it.  This will
                                956                 :      * implicitly detach the error queues, and any other shared memory queues,
                                957                 :      * stored there.
                                958                 :      */
 2901 rhaas                     959 CBC         403 :     if (pcxt->seg != NULL)
 2901 rhaas                     960 ECB             :     {
 2901 rhaas                     961 GIC         403 :         dsm_detach(pcxt->seg);
                                962             403 :         pcxt->seg = NULL;
                                963                 :     }
                                964                 : 
                                965                 :     /*
                                966                 :      * If this parallel context is actually in backend-private memory rather
                                967                 :      * than shared memory, free that memory instead.
                                968                 :      */
 2896                           969             403 :     if (pcxt->private_memory != NULL)
 2901 rhaas                     970 ECB             :     {
 2896 rhaas                     971 UIC           0 :         pfree(pcxt->private_memory);
 2896 rhaas                     972 LBC           0 :         pcxt->private_memory = NULL;
 2901 rhaas                     973 ECB             :     }
                                974                 : 
                                975                 :     /*
                                976                 :      * We can't finish transaction commit or abort until all of the workers
                                977                 :      * have exited.  This means, in particular, that we can't respond to
                                978                 :      * interrupts at this stage.
                                979                 :      */
 2718 rhaas                     980 CBC         403 :     HOLD_INTERRUPTS();
 2718 rhaas                     981 GIC         403 :     WaitForParallelWorkersToExit(pcxt);
 2718 rhaas                     982 GBC         403 :     RESUME_INTERRUPTS();
 2901 rhaas                     983 EUB             : 
                                984                 :     /* Free the worker array itself. */
 2901 rhaas                     985 GIC         403 :     if (pcxt->worker != NULL)
                                986                 :     {
                                987             403 :         pfree(pcxt->worker);
                                988             403 :         pcxt->worker = NULL;
                                989                 :     }
                                990                 : 
 2901 rhaas                     991 ECB             :     /* Free memory. */
 2186 tgl                       992 CBC         403 :     pfree(pcxt->library_name);
                                993             403 :     pfree(pcxt->function_name);
 2901 rhaas                     994 GIC         403 :     pfree(pcxt);
                                995             403 : }
 2901 rhaas                     996 ECB             : 
                                997                 : /*
                                998                 :  * Are there any parallel contexts currently active?
                                999                 :  */
                               1000                 : bool
 2901 rhaas                    1001 GIC      483208 : ParallelContextActive(void)
                               1002                 : {
 2901 rhaas                    1003 CBC      483208 :     return !dlist_is_empty(&pcxt_list);
 2901 rhaas                    1004 ECB             : }
                               1005                 : 
                               1006                 : /*
                               1007                 :  * Handle receipt of an interrupt indicating a parallel worker message.
                               1008                 :  *
                               1009                 :  * Note: this is called within a signal handler!  All we can do is set
                               1010                 :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
                               1011                 :  * HandleParallelMessages().
                               1012                 :  */
                               1013                 : void
 2901 rhaas                    1014 CBC        2711 : HandleParallelMessageInterrupt(void)
                               1015                 : {
 2901 rhaas                    1016 GIC        2711 :     InterruptPending = true;
                               1017            2711 :     ParallelMessagePending = true;
                               1018            2711 :     SetLatch(MyLatch);
                               1019            2711 : }
                               1020                 : 
                               1021                 : /*
                               1022                 :  * Handle any queued protocol messages received from parallel workers.
                               1023                 :  */
                               1024                 : void
 2901 rhaas                    1025 CBC        2606 : HandleParallelMessages(void)
                               1026                 : {
 2901 rhaas                    1027 ECB             :     dlist_iter  iter;
 2417 tgl                      1028                 :     MemoryContext oldcontext;
                               1029                 : 
                               1030                 :     static MemoryContext hpm_context = NULL;
                               1031                 : 
                               1032                 :     /*
                               1033                 :      * This is invoked from ProcessInterrupts(), and since some of the
                               1034                 :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
                               1035                 :      * for recursive calls if more signals are received while this runs.  It's
 2441                          1036                 :      * unclear that recursive entry would be safe, and it doesn't seem useful
                               1037                 :      * even if it is safe, so let's block interrupts until done.
                               1038                 :      */
 2441 tgl                      1039 GIC        2606 :     HOLD_INTERRUPTS();
                               1040                 : 
                               1041                 :     /*
                               1042                 :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
                               1043                 :      * don't want to risk leaking data into long-lived contexts, so let's do
                               1044                 :      * our work here in a private context that we can reset on each use.
                               1045                 :      */
 2417                          1046            2606 :     if (hpm_context == NULL)    /* first time through? */
                               1047              55 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
                               1048                 :                                             "HandleParallelMessages",
                               1049                 :                                             ALLOCSET_DEFAULT_SIZES);
 2417 tgl                      1050 ECB             :     else
 2417 tgl                      1051 GIC        2551 :         MemoryContextReset(hpm_context);
                               1052                 : 
                               1053            2606 :     oldcontext = MemoryContextSwitchTo(hpm_context);
                               1054                 : 
                               1055                 :     /* OK to process messages.  Reset the flag saying there are more to do. */
 2901 rhaas                    1056            2606 :     ParallelMessagePending = false;
 2901 rhaas                    1057 ECB             : 
 2901 rhaas                    1058 CBC        5215 :     dlist_foreach(iter, &pcxt_list)
                               1059                 :     {
                               1060                 :         ParallelContext *pcxt;
                               1061                 :         int         i;
 2901 rhaas                    1062 ECB             : 
 2901 rhaas                    1063 GIC        2612 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
 2901 rhaas                    1064 CBC        2612 :         if (pcxt->worker == NULL)
 2901 rhaas                    1065 UIC           0 :             continue;
                               1066                 : 
 2592 rhaas                    1067 CBC       10396 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
                               1068                 :         {
 2901 rhaas                    1069 ECB             :             /*
                               1070                 :              * Read as many messages as we can from each worker, but stop when
                               1071                 :              * either (1) the worker's error queue goes away, which can happen
                               1072                 :              * if we receive a Terminate message from the worker; or (2) no
                               1073                 :              * more messages can be read from the worker without blocking.
                               1074                 :              */
 2901 rhaas                    1075 CBC       10380 :             while (pcxt->worker[i].error_mqh != NULL)
 2901 rhaas                    1076 EUB             :             {
                               1077                 :                 shm_mq_result res;
 2441 tgl                      1078 ECB             :                 Size        nbytes;
                               1079                 :                 void       *data;
                               1080                 : 
 2901 rhaas                    1081 GIC        7560 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
                               1082                 :                                      &data, true);
                               1083            7560 :                 if (res == SHM_MQ_WOULD_BLOCK)
                               1084            4964 :                     break;
                               1085            2596 :                 else if (res == SHM_MQ_SUCCESS)
 2901 rhaas                    1086 ECB             :                 {
                               1087                 :                     StringInfoData msg;
                               1088                 : 
 2901 rhaas                    1089 GIC        2596 :                     initStringInfo(&msg);
                               1090            2596 :                     appendBinaryStringInfo(&msg, data, nbytes);
                               1091            2596 :                     HandleParallelMessage(pcxt, i, &msg);
 2901 rhaas                    1092 CBC        2593 :                     pfree(msg.data);
                               1093                 :                 }
 2901 rhaas                    1094 ECB             :                 else
 2901 rhaas                    1095 LBC           0 :                     ereport(ERROR,
 2118 tgl                      1096 ECB             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1097                 :                              errmsg("lost connection to parallel worker")));
                               1098                 :             }
                               1099                 :         }
 2901 rhaas                    1100                 :     }
 2441 tgl                      1101                 : 
 2417 tgl                      1102 CBC        2603 :     MemoryContextSwitchTo(oldcontext);
 2417 tgl                      1103 ECB             : 
                               1104                 :     /* Might as well clear the context on our way out */
 2417 tgl                      1105 GIC        2603 :     MemoryContextReset(hpm_context);
 2417 tgl                      1106 EUB             : 
 2441 tgl                      1107 GIC        2603 :     RESUME_INTERRUPTS();
 2901 rhaas                    1108            2603 : }
                               1109                 : 
                               1110                 : /*
                               1111                 :  * Handle a single protocol message received from a single parallel worker.
                               1112                 :  */
 2901 rhaas                    1113 ECB             : static void
 2901 rhaas                    1114 GIC        2596 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
                               1115                 : {
 2878 bruce                    1116 ECB             :     char        msgtype;
                               1117                 : 
 1892 rhaas                    1118 CBC        2596 :     if (pcxt->known_attached_workers != NULL &&
                               1119            2596 :         !pcxt->known_attached_workers[i])
                               1120                 :     {
 1892 rhaas                    1121 GIC        1241 :         pcxt->known_attached_workers[i] = true;
                               1122            1241 :         pcxt->nknown_attached_workers++;
                               1123                 :     }
                               1124                 : 
 2901 rhaas                    1125 CBC        2596 :     msgtype = pq_getmsgbyte(msg);
                               1126                 : 
 2901 rhaas                    1127 GIC        2596 :     switch (msgtype)
                               1128                 :     {
 2878 bruce                    1129 CBC        1298 :         case 'K':               /* BackendKeyData */
 2901 rhaas                    1130 ECB             :             {
 2878 bruce                    1131 GIC        1298 :                 int32       pid = pq_getmsgint(msg, 4);
 2878 bruce                    1132 ECB             : 
 2901 rhaas                    1133 CBC        1298 :                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
 2901 rhaas                    1134 GIC        1298 :                 (void) pq_getmsgend(msg);
                               1135            1298 :                 pcxt->worker[i].pid = pid;
 2901 rhaas                    1136 CBC        1298 :                 break;
                               1137                 :             }
 2901 rhaas                    1138 ECB             : 
 2878 bruce                    1139 GIC           3 :         case 'E':               /* ErrorResponse */
 2878 bruce                    1140 ECB             :         case 'N':               /* NoticeResponse */
                               1141                 :             {
 2901 rhaas                    1142                 :                 ErrorData   edata;
                               1143                 :                 ErrorContextCallback *save_error_context_stack;
                               1144                 : 
 2726                          1145                 :                 /* Parse ErrorResponse or NoticeResponse. */
 2901 rhaas                    1146 CBC           3 :                 pq_parse_errornotice(msg, &edata);
 2901 rhaas                    1147 ECB             : 
                               1148                 :                 /* Death of a worker isn't enough justification for suicide. */
 2901 rhaas                    1149 GIC           3 :                 edata.elevel = Min(edata.elevel, ERROR);
 2901 rhaas                    1150 ECB             : 
                               1151                 :                 /*
                               1152                 :                  * If desired, add a context line to show that this is a
                               1153                 :                  * message propagated from a parallel worker.  Otherwise, it
                               1154                 :                  * can sometimes be confusing to understand what actually
                               1155                 :                  * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
                               1156                 :                  * because it causes test-result instability depending on
 2417 tgl                      1157                 :                  * whether a parallel worker is actually used or not.)
                               1158                 :                  */
   53 drowley                  1159 GNC           3 :                 if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
 2417 tgl                      1160 ECB             :                 {
 2417 tgl                      1161 GIC           3 :                     if (edata.context)
 2417 tgl                      1162 UIC           0 :                         edata.context = psprintf("%s\n%s", edata.context,
                               1163                 :                                                  _("parallel worker"));
                               1164                 :                     else
 2417 tgl                      1165 GIC           3 :                         edata.context = pstrdup(_("parallel worker"));
                               1166                 :                 }
                               1167                 : 
                               1168                 :                 /*
                               1169                 :                  * Context beyond that should use the error context callbacks
 2417 tgl                      1170 ECB             :                  * that were in effect when the ParallelContext was created,
                               1171                 :                  * not the current ones.
                               1172                 :                  */
 2417 tgl                      1173 GBC           3 :                 save_error_context_stack = error_context_stack;
 2417 tgl                      1174 GIC           3 :                 error_context_stack = pcxt->error_context_stack;
                               1175                 : 
 2417 tgl                      1176 ECB             :                 /* Rethrow error or print notice. */
 2901 rhaas                    1177 GIC           3 :                 ThrowErrorData(&edata);
                               1178                 : 
                               1179                 :                 /* Not an error, so restore previous context stack. */
 2901 rhaas                    1180 UIC           0 :                 error_context_stack = save_error_context_stack;
                               1181                 : 
                               1182               0 :                 break;
                               1183                 :             }
 2901 rhaas                    1184 ECB             : 
 2878 bruce                    1185 LBC           0 :         case 'A':               /* NotifyResponse */
                               1186                 :             {
                               1187                 :                 /* Propagate NotifyResponse. */
 2474 rhaas                    1188 ECB             :                 int32       pid;
                               1189                 :                 const char *channel;
                               1190                 :                 const char *payload;
 2474 rhaas                    1191 EUB             : 
 2474 rhaas                    1192 UIC           0 :                 pid = pq_getmsgint(msg, 4);
 2474 rhaas                    1193 UBC           0 :                 channel = pq_getmsgrawstring(msg);
 2474 rhaas                    1194 UIC           0 :                 payload = pq_getmsgrawstring(msg);
                               1195               0 :                 pq_endmessage(msg);
 2474 rhaas                    1196 EUB             : 
 2474 rhaas                    1197 UIC           0 :                 NotifyMyFrontEnd(channel, payload, pid);
                               1198                 : 
 2901                          1199               0 :                 break;
                               1200                 :             }
                               1201                 : 
 2878 bruce                    1202 GIC        1295 :         case 'X':               /* Terminate, indicating clean exit */
 2901 rhaas                    1203 EUB             :             {
 2047 tgl                      1204 GBC        1295 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
 2901 rhaas                    1205            1295 :                 pcxt->worker[i].error_mqh = NULL;
                               1206            1295 :                 break;
                               1207                 :             }
 2901 rhaas                    1208 EUB             : 
 2901 rhaas                    1209 UIC           0 :         default:
 2901 rhaas                    1210 EUB             :             {
 2442 tgl                      1211 UIC           0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
                               1212                 :                      msgtype, msg->len);
 2901 rhaas                    1213 ECB             :             }
                               1214                 :     }
 2901 rhaas                    1215 CBC        2593 : }
 2901 rhaas                    1216 ECB             : 
                               1217                 : /*
                               1218                 :  * End-of-subtransaction cleanup for parallel contexts.
                               1219                 :  *
 2901 rhaas                    1220 EUB             :  * Currently, it's forbidden to enter or leave a subtransaction while
                               1221                 :  * parallel mode is in effect, so we could just blow away everything.  But
                               1222                 :  * we may want to relax that restriction in the future, so this code
                               1223                 :  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
                               1224                 :  */
                               1225                 : void
 2901 rhaas                    1226 CBC           3 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
                               1227                 : {
 2901 rhaas                    1228 GIC           6 :     while (!dlist_is_empty(&pcxt_list))
                               1229                 :     {
                               1230                 :         ParallelContext *pcxt;
                               1231                 : 
                               1232               3 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
                               1233               3 :         if (pcxt->subid != mySubId)
 2901 rhaas                    1234 UIC           0 :             break;
 2901 rhaas                    1235 GIC           3 :         if (isCommit)
 2901 rhaas                    1236 UIC           0 :             elog(WARNING, "leaked parallel context");
 2901 rhaas                    1237 CBC           3 :         DestroyParallelContext(pcxt);
                               1238                 :     }
                               1239               3 : }
                               1240                 : 
                               1241                 : /*
                               1242                 :  * End-of-transaction cleanup for parallel contexts.
 2901 rhaas                    1243 ECB             :  */
                               1244                 : void
 2901 rhaas                    1245 GBC        1298 : AtEOXact_Parallel(bool isCommit)
 2901 rhaas                    1246 ECB             : {
 2901 rhaas                    1247 GBC        1298 :     while (!dlist_is_empty(&pcxt_list))
 2901 rhaas                    1248 ECB             :     {
                               1249                 :         ParallelContext *pcxt;
                               1250                 : 
 2901 rhaas                    1251 UIC           0 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
                               1252               0 :         if (isCommit)
                               1253               0 :             elog(WARNING, "leaked parallel context");
                               1254               0 :         DestroyParallelContext(pcxt);
                               1255                 :     }
 2901 rhaas                    1256 CBC        1298 : }
                               1257                 : 
 2901 rhaas                    1258 ECB             : /*
                               1259                 :  * Main entrypoint for parallel workers.
                               1260                 :  */
                               1261                 : void
 2901 rhaas                    1262 GBC        1298 : ParallelWorkerMain(Datum main_arg)
 2901 rhaas                    1263 EUB             : {
                               1264                 :     dsm_segment *seg;
 2878 bruce                    1265                 :     shm_toc    *toc;
                               1266                 :     FixedParallelState *fps;
 2878 bruce                    1267 ECB             :     char       *error_queue_space;
                               1268                 :     shm_mq     *mq;
                               1269                 :     shm_mq_handle *mqh;
                               1270                 :     char       *libraryspace;
                               1271                 :     char       *entrypointstate;
                               1272                 :     char       *library_name;
 2186 tgl                      1273                 :     char       *function_name;
                               1274                 :     parallel_worker_main_type entrypt;
                               1275                 :     char       *gucspace;
                               1276                 :     char       *combocidspace;
                               1277                 :     char       *tsnapspace;
                               1278                 :     char       *asnapspace;
                               1279                 :     char       *tstatespace;
                               1280                 :     char       *pendingsyncsspace;
                               1281                 :     char       *reindexspace;
                               1282                 :     char       *relmapperspace;
                               1283                 :     char       *uncommittedenumsspace;
                               1284                 :     char       *clientconninfospace;
                               1285                 :     StringInfoData msgbuf;
                               1286                 :     char       *session_dsm_handle_space;
                               1287                 :     Snapshot    tsnapshot;
                               1288                 :     Snapshot    asnapshot;
                               1289                 : 
                               1290                 :     /* Set flag to indicate that we're initializing a parallel worker. */
 2732 rhaas                    1291 GIC        1298 :     InitializingParallelWorker = true;
                               1292                 : 
                               1293                 :     /* Establish signal handlers. */
 2901                          1294            1298 :     pqsignal(SIGTERM, die);
                               1295            1298 :     BackgroundWorkerUnblockSignals();
                               1296                 : 
                               1297                 :     /* Determine and set our parallel worker number. */
 2712                          1298            1298 :     Assert(ParallelWorkerNumber == -1);
                               1299            1298 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
                               1300                 : 
                               1301                 :     /* Set up a memory context to work in, just for cleanliness. */
 2901                          1302            1298 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
 2416 tgl                      1303 ECB             :                                                  "Parallel worker",
                               1304                 :                                                  ALLOCSET_DEFAULT_SIZES);
                               1305                 : 
 2901 rhaas                    1306                 :     /*
 1726 tgl                      1307                 :      * Attach to the dynamic shared memory segment for the parallel query, and
                               1308                 :      * find its table of contents.
                               1309                 :      *
                               1310                 :      * Note: at this point, we have not created any ResourceOwner in this
                               1311                 :      * process.  This will result in our DSM mapping surviving until process
                               1312                 :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
                               1313                 :      * ownership of the mapping, but we have no need for that.
 2901 rhaas                    1314                 :      */
 2901 rhaas                    1315 GIC        1298 :     seg = dsm_attach(DatumGetUInt32(main_arg));
                               1316            1298 :     if (seg == NULL)
 2901 rhaas                    1317 UIC           0 :         ereport(ERROR,
                               1318                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1319                 :                  errmsg("could not map dynamic shared memory segment")));
 2901 rhaas                    1320 GIC        1298 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
                               1321            1298 :     if (toc == NULL)
 2901 rhaas                    1322 UIC           0 :         ereport(ERROR,
                               1323                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1324                 :                  errmsg("invalid magic number in dynamic shared memory segment")));
                               1325                 : 
                               1326                 :     /* Look up fixed parallel state. */
 2134 tgl                      1327 CBC        1298 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
 2901 rhaas                    1328            1298 :     MyFixedParallelState = fps;
 2901 rhaas                    1329 EUB             : 
                               1330                 :     /* Arrange to signal the leader if we exit. */
 1029 andres                   1331 GIC        1298 :     ParallelLeaderPid = fps->parallel_leader_pid;
 1029 andres                   1332 CBC        1298 :     ParallelLeaderBackendId = fps->parallel_leader_backend_id;
  739                          1333            1298 :     before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
 1902 rhaas                    1334 EUB             : 
                               1335                 :     /*
                               1336                 :      * Now we can find and attach to the error queue provided for us.  That's
                               1337                 :      * good, because until we do that, any errors that happen here will not be
                               1338                 :      * reported back to the process that requested that this worker be
 1902 rhaas                    1339 ECB             :      * launched.
 2901                          1340                 :      */
 2134 tgl                      1341 GIC        1298 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
 2901 rhaas                    1342            1298 :     mq = (shm_mq *) (error_queue_space +
 2878 bruce                    1343 CBC        1298 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
 2901 rhaas                    1344            1298 :     shm_mq_set_sender(mq, MyProc);
                               1345            1298 :     mqh = shm_mq_attach(mq, seg, NULL);
 2732 rhaas                    1346 GIC        1298 :     pq_redirect_to_shm_mq(seg, mqh);
 1029 andres                   1347            1298 :     pq_set_parallel_leader(fps->parallel_leader_pid,
                               1348                 :                            fps->parallel_leader_backend_id);
                               1349                 : 
                               1350                 :     /*
                               1351                 :      * Send a BackendKeyData message to the process that initiated parallelism
                               1352                 :      * so that it has access to our PID before it receives any other messages
 2878 bruce                    1353 ECB             :      * from us.  Our cancel key is sent, too, since that's the way the
                               1354                 :      * protocol message is defined, but it won't actually be used for anything
                               1355                 :      * in this case.
 2901 rhaas                    1356                 :      */
 2901 rhaas                    1357 CBC        1298 :     pq_beginmessage(&msgbuf, 'K');
 2006 andres                   1358            1298 :     pq_sendint32(&msgbuf, (int32) MyProcPid);
                               1359            1298 :     pq_sendint32(&msgbuf, (int32) MyCancelKey);
 2901 rhaas                    1360 GIC        1298 :     pq_endmessage(&msgbuf);
                               1361                 : 
                               1362                 :     /*
                               1363                 :      * Hooray! Primary initialization is complete.  Now, we need to set up our
                               1364                 :      * backend-local state to match the original backend.
                               1365                 :      */
                               1366                 : 
                               1367                 :     /*
                               1368                 :      * Join locking group.  We must do this before anything that could try to
 2495 rhaas                    1369 ECB             :      * acquire a heavyweight lock, because any heavyweight locks acquired to
                               1370                 :      * this point could block either directly against the parallel group
 2618                          1371                 :      * leader or against some process which in turn waits for a lock that
                               1372                 :      * conflicts with the parallel group leader, causing an undetected
                               1373                 :      * deadlock.  (If we can't join the lock group, the leader has gone away,
                               1374                 :      * so just exit quietly.)
                               1375                 :      */
 1029 andres                   1376 GIC        1298 :     if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
                               1377                 :                                fps->parallel_leader_pid))
 2618 rhaas                    1378 UIC           0 :         return;
                               1379                 : 
                               1380                 :     /*
                               1381                 :      * Restore transaction and statement start-time timestamps.  This must
                               1382                 :      * happen before anything that would start a transaction, else asserts in
                               1383                 :      * xact.c will fire.
                               1384                 :      */
 1646 tgl                      1385 GIC        1298 :     SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
                               1386                 : 
                               1387                 :     /*
 2186 tgl                      1388 ECB             :      * Identify the entry point to be called.  In theory this could result in
                               1389                 :      * loading an additional library, though most likely the entry point is in
 2186 tgl                      1390 EUB             :      * the core backend or in a library we just loaded.
                               1391                 :      */
 2134 tgl                      1392 GIC        1298 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
 2186                          1393            1298 :     library_name = entrypointstate;
                               1394            1298 :     function_name = entrypointstate + strlen(library_name) + 1;
                               1395                 : 
                               1396            1298 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
 2186 tgl                      1397 ECB             : 
                               1398                 :     /* Restore database connection. */
 2901 rhaas                    1399 GIC        1298 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
                               1400                 :                                               fps->authenticated_user_id,
                               1401                 :                                               0);
                               1402                 : 
                               1403                 :     /*
 2474 rhaas                    1404 ECB             :      * Set the client encoding to the database encoding, since that is what
                               1405                 :      * the leader will expect.
                               1406                 :      */
 2474 rhaas                    1407 GIC        1298 :     SetClientEncoding(GetDatabaseEncoding());
 2474 rhaas                    1408 ECB             : 
                               1409                 :     /*
                               1410                 :      * Load libraries that were loaded by original backend.  We want to do
 1662 tmunro                   1411                 :      * this before restoring GUCs, because the libraries might define custom
                               1412                 :      * variables.
                               1413                 :      */
 1662 tmunro                   1414 GIC        1298 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
                               1415            1298 :     StartTransactionCommand();
                               1416            1298 :     RestoreLibraryState(libraryspace);
                               1417                 : 
                               1418                 :     /* Restore GUC values from launching backend. */
 2134 tgl                      1419 CBC        1298 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
 2901 rhaas                    1420 GIC        1298 :     RestoreGUCState(gucspace);
                               1421            1298 :     CommitTransactionCommand();
                               1422                 : 
                               1423                 :     /* Crank up a transaction state appropriate to a parallel worker. */
 2134 tgl                      1424            1298 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
 2901 rhaas                    1425            1298 :     StartParallelWorkerTransaction(tstatespace);
 2901 rhaas                    1426 ECB             : 
                               1427                 :     /* Restore combo CID state. */
 2134 tgl                      1428 CBC        1298 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
 2901 rhaas                    1429 GIC        1298 :     RestoreComboCIDState(combocidspace);
                               1430                 : 
 2033 andres                   1431 ECB             :     /* Attach to the per-session DSM segment and contained objects. */
                               1432                 :     session_dsm_handle_space =
 2033 andres                   1433 CBC        1298 :         shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
 2033 andres                   1434 GIC        1298 :     AttachSession(*(dsm_handle *) session_dsm_handle_space);
                               1435                 : 
  592 rhaas                    1436 ECB             :     /*
                               1437                 :      * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
                               1438                 :      * the leader has serialized the transaction snapshot and we must restore
                               1439                 :      * it. At lower isolation levels, there is no transaction-lifetime
                               1440                 :      * snapshot, but we need TransactionXmin to get set to a value which is
                               1441                 :      * less than or equal to the xmin of every snapshot that will be used by
                               1442                 :      * this worker. The easiest way to accomplish that is to install the
                               1443                 :      * active snapshot as the transaction snapshot. Code running in this
                               1444                 :      * parallel worker might take new snapshots via GetTransactionSnapshot()
                               1445                 :      * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
                               1446                 :      * snapshot older than the active snapshot.
                               1447                 :      */
 2134 tgl                      1448 GIC        1298 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
  592 rhaas                    1449            1298 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
                               1450            1298 :     asnapshot = RestoreSnapshot(asnapspace);
                               1451            1298 :     tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
                               1452            1298 :     RestoreTransactionSnapshot(tsnapshot,
                               1453            1298 :                                fps->parallel_leader_pgproc);
                               1454            1298 :     PushActiveSnapshot(asnapshot);
                               1455                 : 
                               1456                 :     /*
                               1457                 :      * We've changed which tuples we can see, and must therefore invalidate
                               1458                 :      * system caches.
                               1459                 :      */
 2732 rhaas                    1460 CBC        1298 :     InvalidateSystemCaches();
 2732 rhaas                    1461 ECB             : 
 1988                          1462                 :     /*
                               1463                 :      * Restore current role id.  Skip verifying whether session user is
                               1464                 :      * allowed to become this role and blindly restore the leader's state for
                               1465                 :      * current role.
                               1466                 :      */
 1988 rhaas                    1467 GIC        1298 :     SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
                               1468                 : 
                               1469                 :     /* Restore user ID and security context. */
 2901                          1470            1298 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
                               1471                 : 
 2495 tgl                      1472 ECB             :     /* Restore temp-namespace state to ensure search path matches leader's. */
 2495 tgl                      1473 GIC        1298 :     SetTempNamespaceState(fps->temp_namespace_id,
                               1474                 :                           fps->temp_toast_namespace_id);
                               1475                 : 
                               1476                 :     /* Restore pending syncs. */
 1100 noah                     1477            1298 :     pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
                               1478                 :                                        false);
 1100 noah                     1479 CBC        1298 :     RestorePendingSyncs(pendingsyncsspace);
                               1480                 : 
                               1481                 :     /* Restore reindex state. */
 1906 rhaas                    1482            1298 :     reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
 1906 rhaas                    1483 GIC        1298 :     RestoreReindexState(reindexspace);
                               1484                 : 
 1703 pg                       1485 ECB             :     /* Restore relmapper state. */
 1703 pg                       1486 GIC        1298 :     relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
                               1487            1298 :     RestoreRelationMap(relmapperspace);
                               1488                 : 
  824 tmunro                   1489 ECB             :     /* Restore uncommitted enums. */
  824 tmunro                   1490 GIC        1298 :     uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
  824 tmunro                   1491 ECB             :                                            false);
  824 tmunro                   1492 GIC        1298 :     RestoreUncommittedEnums(uncommittedenumsspace);
                               1493                 : 
                               1494                 :     /* Restore the ClientConnectionInfo. */
  228 michael                  1495 GNC        1298 :     clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
                               1496                 :                                          false);
                               1497            1298 :     RestoreClientConnectionInfo(clientconninfospace);
                               1498                 : 
                               1499                 :     /*
                               1500                 :      * Initialize SystemUser now that MyClientConnectionInfo is restored.
                               1501                 :      * Also ensure that auth_method is actually valid, aka authn_id is not NULL.
                               1502                 :      */
  192                          1503            1298 :     if (MyClientConnectionInfo.authn_id)
                               1504               4 :         InitializeSystemUser(MyClientConnectionInfo.authn_id,
                               1505                 :                              hba_authname(MyClientConnectionInfo.auth_method));
                               1506                 : 
 1486 tmunro                   1507 ECB             :     /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
 1486 tmunro                   1508 CBC        1298 :     AttachSerializableXact(fps->serializable_xact_handle);
                               1509                 : 
                               1510                 :     /*
 2878 bruce                    1511 ECB             :      * We've initialized all of our state now; nothing should change
                               1512                 :      * hereafter.
                               1513                 :      */
 2732 rhaas                    1514 GIC        1298 :     InitializingParallelWorker = false;
 2901 rhaas                    1515 CBC        1298 :     EnterParallelMode();
                               1516                 : 
 2901 rhaas                    1517 ECB             :     /*
                               1518                 :      * Time to do the real work: invoke the caller-supplied code.
                               1519                 :      */
 2186 tgl                      1520 CBC        1298 :     entrypt(seg, toc);
                               1521                 : 
 2901 rhaas                    1522 ECB             :     /* Must exit parallel mode to pop active snapshot. */
 2901 rhaas                    1523 GIC        1295 :     ExitParallelMode();
                               1524                 : 
                               1525                 :     /* Must pop active snapshot so snapmgr.c doesn't complain. */
                               1526            1295 :     PopActiveSnapshot();
                               1527                 : 
 2901 rhaas                    1528 ECB             :     /* Shut down the parallel-worker transaction. */
 2901 rhaas                    1529 CBC        1295 :     EndParallelWorkerTransaction();
                               1530                 : 
                               1531                 :     /* Detach from the per-session DSM segment. */
 2033 andres                   1532 GIC        1295 :     DetachSession();
 2033 andres                   1533 ECB             : 
                               1534                 :     /* Report success. */
 2901 rhaas                    1535 GIC        1295 :     pq_putmessage('X', NULL, 0);
                               1536                 : }
                               1537                 : 
                               1538                 : /*
 2901 rhaas                    1539 ECB             :  * Update shared memory with the ending location of the last WAL record we
                               1540                 :  * wrote, if it's greater than the value already stored there.
                               1541                 :  */
                               1542                 : void
 2901 rhaas                    1543 GIC        1295 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
                               1544                 : {
 2901 rhaas                    1545 CBC        1295 :     FixedParallelState *fps = MyFixedParallelState;
                               1546                 : 
 2901 rhaas                    1547 GIC        1295 :     Assert(fps != NULL);
 2901 rhaas                    1548 CBC        1295 :     SpinLockAcquire(&fps->mutex);
 2901 rhaas                    1549 GIC        1295 :     if (fps->last_xlog_end < last_xlog_end)
                               1550              58 :         fps->last_xlog_end = last_xlog_end;
 2901 rhaas                    1551 CBC        1295 :     SpinLockRelease(&fps->mutex);
 2901 rhaas                    1552 GIC        1295 : }
                               1553                 : 
 1902 rhaas                    1554 ECB             : /*
                               1555                 :  * Make sure the leader tries to read from our error queue one more time.
                               1556                 :  * This guards against the case where we exit uncleanly without sending an
                               1557                 :  * ErrorResponse to the leader, for example because some code calls proc_exit
                               1558                 :  * directly.
                               1559                 :  *
  739 andres                   1560                 :  * Also explicitly detach from dsm segment so that subsystems using
                               1561                 :  * on_dsm_detach() have a chance to send stats before the stats subsystem is
                               1562                 :  * shut down as part of a before_shmem_exit() hook.
                               1563                 :  *
                               1564                 :  * One might think this could instead be solved by carefully ordering the
                               1565                 :  * attaching to dsm segments, so that the pgstats segments get detached from
                               1566                 :  * later than the parallel query one. That turns out to not work because the
                               1567                 :  * stats hash might need to grow which can cause new segments to be allocated,
                               1568                 :  * which then will be detached from earlier.
                               1569                 :  */
 1902 rhaas                    1570                 : static void
 1902 rhaas                    1571 GIC        1298 : ParallelWorkerShutdown(int code, Datum arg)
 1902 rhaas                    1572 ECB             : {
 1029 andres                   1573 CBC        1298 :     SendProcSignal(ParallelLeaderPid,
 1902 rhaas                    1574 ECB             :                    PROCSIG_PARALLEL_MESSAGE,
 1029 andres                   1575                 :                    ParallelLeaderBackendId);
  739                          1576                 : 
  739 andres                   1577 CBC        1298 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
 1902 rhaas                    1578 GIC        1298 : }
                               1579                 : 
                               1580                 : /*
                               1581                 :  * Look up (and possibly load) a parallel worker entry point function.
                               1582                 :  *
                               1583                 :  * For functions contained in the core code, we use library name "postgres"
                               1584                 :  * and consult the InternalParallelWorkers array.  External functions are
                               1585                 :  * looked up, and loaded if necessary, using load_external_function().
                               1586                 :  *
                               1587                 :  * The point of this is to pass function names as strings across process
                               1588                 :  * boundaries.  We can't pass actual function addresses because of the
                               1589                 :  * possibility that the function has been loaded at a different address
                               1590                 :  * in a different process.  This is obviously a hazard for functions in
                               1591                 :  * loadable libraries, but it can happen even for functions in the core code
                               1592                 :  * on platforms using EXEC_BACKEND (e.g., Windows).
                               1593                 :  *
                               1594                 :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
                               1595                 :  * in favor of applying load_external_function() for core functions too;
 2186 tgl                      1596 ECB             :  * but that raises portability issues that are not worth addressing now.
                               1597                 :  */
                               1598                 : static parallel_worker_main_type
 2186 tgl                      1599 GIC        1298 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
                               1600                 : {
                               1601                 :     /*
 2186 tgl                      1602 ECB             :      * If the function is to be loaded from postgres itself, search the
                               1603                 :      * InternalParallelWorkers array.
                               1604                 :      */
 2186 tgl                      1605 GIC        1298 :     if (strcmp(libraryname, "postgres") == 0)
                               1606                 :     {
                               1607                 :         int         i;
                               1608                 : 
                               1609            1403 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
                               1610                 :         {
                               1611            1403 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
                               1612            1298 :                 return InternalParallelWorkers[i].fn_addr;
                               1613                 :         }
                               1614                 : 
                               1615                 :         /* We can only reach this by programming error. */
 2186 tgl                      1616 UIC           0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
                               1617                 :     }
                               1618                 : 
                               1619                 :     /* Otherwise load from external library. */
                               1620               0 :     return (parallel_worker_main_type)
                               1621               0 :         load_external_function(libraryname, funcname, true, NULL);
                               1622                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a