LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Coverage Total Hit UNC LBC UBC GBC GNC CBC DUB DCB
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 90.3 % 483 436 1 46 12 424 1 16
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 19 19 3 16
Baseline: 16@8cea358b128 Branches: 62.4 % 205 128 1 76 2 126
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed [..60] days: 100.0 % 2 2 2
(180,240] days: 75.0 % 4 3 1 3
(240..) days: 90.4 % 477 431 46 7 424
Function coverage date bins:
(240..) days: 100.0 % 19 19 3 16
Branch coverage date bins:
(240..) days: 62.4 % 205 128 1 76 2 126

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * parallel.c
                                  4                 :                :  *    Infrastructure for launching parallel workers
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
                                  7                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :                :  *
                                  9                 :                :  * IDENTIFICATION
                                 10                 :                :  *    src/backend/access/transam/parallel.c
                                 11                 :                :  *
                                 12                 :                :  *-------------------------------------------------------------------------
                                 13                 :                :  */
                                 14                 :                : 
                                 15                 :                : #include "postgres.h"
                                 16                 :                : 
                                 17                 :                : #include "access/brin.h"
                                 18                 :                : #include "access/nbtree.h"
                                 19                 :                : #include "access/parallel.h"
                                 20                 :                : #include "access/session.h"
                                 21                 :                : #include "access/xact.h"
                                 22                 :                : #include "access/xlog.h"
                                 23                 :                : #include "catalog/index.h"
                                 24                 :                : #include "catalog/namespace.h"
                                 25                 :                : #include "catalog/pg_enum.h"
                                 26                 :                : #include "catalog/storage.h"
                                 27                 :                : #include "commands/async.h"
                                 28                 :                : #include "commands/vacuum.h"
                                 29                 :                : #include "executor/execParallel.h"
                                 30                 :                : #include "libpq/libpq.h"
                                 31                 :                : #include "libpq/pqformat.h"
                                 32                 :                : #include "libpq/pqmq.h"
                                 33                 :                : #include "miscadmin.h"
                                 34                 :                : #include "optimizer/optimizer.h"
                                 35                 :                : #include "pgstat.h"
                                 36                 :                : #include "storage/ipc.h"
                                 37                 :                : #include "storage/predicate.h"
                                 38                 :                : #include "storage/spin.h"
                                 39                 :                : #include "tcop/tcopprot.h"
                                 40                 :                : #include "utils/combocid.h"
                                 41                 :                : #include "utils/guc.h"
                                 42                 :                : #include "utils/inval.h"
                                 43                 :                : #include "utils/memutils.h"
                                 44                 :                : #include "utils/relmapper.h"
                                 45                 :                : #include "utils/snapmgr.h"
                                 46                 :                : 
                                 47                 :                : /*
                                 48                 :                :  * We don't want to waste a lot of memory on an error queue which, most of
                                 49                 :                :  * the time, will process only a handful of small messages.  However, it is
                                 50                 :                :  * desirable to make it large enough that a typical ErrorResponse can be sent
                                 51                 :                :  * without blocking.  That way, a worker that errors out can write the whole
                                 52                 :                :  * message into the queue and terminate without waiting for the user backend.
                                 53                 :                :  */
                                 54                 :                : #define PARALLEL_ERROR_QUEUE_SIZE           16384
                                 55                 :                : 
                                 56                 :                : /* Magic number for parallel context TOC. */
                                 57                 :                : #define PARALLEL_MAGIC                      0x50477c7c
                                 58                 :                : 
                                 59                 :                : /*
                                 60                 :                :  * Magic numbers for per-context parallel state sharing.  Higher-level code
                                 61                 :                :  * should use smaller values, leaving these very large ones for use by this
                                 62                 :                :  * module.
                                 63                 :                :  */
                                 64                 :                : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
                                 65                 :                : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
                                 66                 :                : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
                                 67                 :                : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
                                 68                 :                : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
                                 69                 :                : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
                                 70                 :                : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
                                 71                 :                : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
                                 72                 :                : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
                                 73                 :                : #define PARALLEL_KEY_SESSION_DSM            UINT64CONST(0xFFFFFFFFFFFF000A)
                                 74                 :                : #define PARALLEL_KEY_PENDING_SYNCS          UINT64CONST(0xFFFFFFFFFFFF000B)
                                 75                 :                : #define PARALLEL_KEY_REINDEX_STATE          UINT64CONST(0xFFFFFFFFFFFF000C)
                                 76                 :                : #define PARALLEL_KEY_RELMAPPER_STATE        UINT64CONST(0xFFFFFFFFFFFF000D)
                                 77                 :                : #define PARALLEL_KEY_UNCOMMITTEDENUMS       UINT64CONST(0xFFFFFFFFFFFF000E)
                                 78                 :                : #define PARALLEL_KEY_CLIENTCONNINFO         UINT64CONST(0xFFFFFFFFFFFF000F)
                                 79                 :                : 
                                 80                 :                : /* Fixed-size parallel state. */
                                 81                 :                : typedef struct FixedParallelState
                                 82                 :                : {
                                 83                 :                :     /* Fixed-size state that workers must restore. */
                                 84                 :                :     Oid         database_id;
                                 85                 :                :     Oid         authenticated_user_id;
                                 86                 :                :     Oid         current_user_id;
                                 87                 :                :     Oid         outer_user_id;
                                 88                 :                :     Oid         temp_namespace_id;
                                 89                 :                :     Oid         temp_toast_namespace_id;
                                 90                 :                :     int         sec_context;
                                 91                 :                :     bool        is_superuser;
                                 92                 :                :     PGPROC     *parallel_leader_pgproc;
                                 93                 :                :     pid_t       parallel_leader_pid;
                                 94                 :                :     ProcNumber  parallel_leader_proc_number;
                                 95                 :                :     TimestampTz xact_ts;
                                 96                 :                :     TimestampTz stmt_ts;
                                 97                 :                :     SerializableXactHandle serializable_xact_handle;
                                 98                 :                : 
                                 99                 :                :     /* Mutex protects remaining fields. */
                                100                 :                :     slock_t     mutex;
                                101                 :                : 
                                102                 :                :     /* Maximum XactLastRecEnd of any worker. */
                                103                 :                :     XLogRecPtr  last_xlog_end;
                                104                 :                : } FixedParallelState;
                                105                 :                : 
                                106                 :                : /*
                                107                 :                :  * Our parallel worker number.  We initialize this to -1, meaning that we are
                                108                 :                :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
                                109                 :                :  * and < the number of workers before any user code is invoked; each parallel
                                110                 :                :  * worker will get a different parallel worker number.
                                111                 :                :  */
                                112                 :                : int         ParallelWorkerNumber = -1;
                                113                 :                : 
                                114                 :                : /* Is there a parallel message pending which we need to receive? */
                                115                 :                : volatile sig_atomic_t ParallelMessagePending = false;
                                116                 :                : 
                                117                 :                : /* Are we initializing a parallel worker? */
                                118                 :                : bool        InitializingParallelWorker = false;
                                119                 :                : 
                                120                 :                : /* Pointer to our fixed parallel state. */
                                121                 :                : static FixedParallelState *MyFixedParallelState;
                                122                 :                : 
                                123                 :                : /* List of active parallel contexts. */
                                124                 :                : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
                                125                 :                : 
                                126                 :                : /* Backend-local copy of data from FixedParallelState. */
                                127                 :                : static pid_t ParallelLeaderPid;
                                128                 :                : 
                                129                 :                : /*
                                130                 :                :  * List of internal parallel worker entry points.  We need this for
                                131                 :                :  * reasons explained in LookupParallelWorkerFunction(), below.
                                132                 :                :  */
                                133                 :                : static const struct
                                134                 :                : {
                                135                 :                :     const char *fn_name;
                                136                 :                :     parallel_worker_main_type fn_addr;
                                137                 :                : }           InternalParallelWorkers[] =
                                138                 :                : 
                                139                 :                : {
                                140                 :                :     {
                                141                 :                :         "ParallelQueryMain", ParallelQueryMain
                                142                 :                :     },
                                143                 :                :     {
                                144                 :                :         "_bt_parallel_build_main", _bt_parallel_build_main
                                145                 :                :     },
                                146                 :                :     {
                                147                 :                :         "_brin_parallel_build_main", _brin_parallel_build_main
                                148                 :                :     },
                                149                 :                :     {
                                150                 :                :         "parallel_vacuum_main", parallel_vacuum_main
                                151                 :                :     }
                                152                 :                : };
                                153                 :                : 
                                154                 :                : /* Private functions. */
                                155                 :                : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
                                156                 :                : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
                                157                 :                : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
                                158                 :                : static void ParallelWorkerShutdown(int code, Datum arg);
                                159                 :                : 
                                160                 :                : 
                                161                 :                : /*
                                162                 :                :  * Establish a new parallel context.  This should be done after entering
                                163                 :                :  * parallel mode, and (unless there is an error) the context should be
                                164                 :                :  * destroyed before exiting the current subtransaction.
                                165                 :                :  */
                                166                 :                : ParallelContext *
 2557 tgl@sss.pgh.pa.us         167                 :CBC         414 : CreateParallelContext(const char *library_name, const char *function_name,
                                168                 :                :                       int nworkers)
                                169                 :                : {
                                170                 :                :     MemoryContext oldcontext;
                                171                 :                :     ParallelContext *pcxt;
                                172                 :                : 
                                173                 :                :     /* It is unsafe to create a parallel context if not in parallel mode. */
 3272 rhaas@postgresql.org      174         [ -  + ]:            414 :     Assert(IsInParallelMode());
                                175                 :                : 
                                176                 :                :     /* Number of workers should be non-negative. */
                                177         [ -  + ]:            414 :     Assert(nworkers >= 0);
                                178                 :                : 
                                179                 :                :     /* We might be running in a short-lived memory context. */
                                180                 :            414 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
                                181                 :                : 
                                182                 :                :     /* Initialize a new ParallelContext. */
                                183                 :            414 :     pcxt = palloc0(sizeof(ParallelContext));
                                184                 :            414 :     pcxt->subid = GetCurrentSubTransactionId();
                                185                 :            414 :     pcxt->nworkers = nworkers;
 1546 akapila@postgresql.o      186                 :            414 :     pcxt->nworkers_to_launch = nworkers;
 2557 tgl@sss.pgh.pa.us         187                 :            414 :     pcxt->library_name = pstrdup(library_name);
                                188                 :            414 :     pcxt->function_name = pstrdup(function_name);
 3272 rhaas@postgresql.org      189                 :            414 :     pcxt->error_context_stack = error_context_stack;
                                190                 :            414 :     shm_toc_initialize_estimator(&pcxt->estimator);
                                191                 :            414 :     dlist_push_head(&pcxt_list, &pcxt->node);
                                192                 :                : 
                                193                 :                :     /* Restore previous memory context. */
                                194                 :            414 :     MemoryContextSwitchTo(oldcontext);
                                195                 :                : 
                                196                 :            414 :     return pcxt;
                                197                 :                : }
                                198                 :                : 
                                199                 :                : /*
                                200                 :                :  * Establish the dynamic shared memory segment for a parallel context and
                                201                 :                :  * copy state and other bookkeeping information that will be needed by
                                202                 :                :  * parallel workers into it.
                                203                 :                :  */
                                204                 :                : void
                                205                 :            414 : InitializeParallelDSM(ParallelContext *pcxt)
                                206                 :                : {
                                207                 :                :     MemoryContext oldcontext;
 3249 bruce@momjian.us          208                 :            414 :     Size        library_len = 0;
                                209                 :            414 :     Size        guc_len = 0;
                                210                 :            414 :     Size        combocidlen = 0;
                                211                 :            414 :     Size        tsnaplen = 0;
                                212                 :            414 :     Size        asnaplen = 0;
                                213                 :            414 :     Size        tstatelen = 0;
 1471 noah@leadboat.com         214                 :            414 :     Size        pendingsyncslen = 0;
 2277 rhaas@postgresql.org      215                 :            414 :     Size        reindexlen = 0;
 2074 pg@bowt.ie                216                 :            414 :     Size        relmapperlen = 0;
 1195 tmunro@postgresql.or      217                 :            414 :     Size        uncommittedenumslen = 0;
  599 michael@paquier.xyz       218                 :            414 :     Size        clientconninfolen = 0;
 3249 bruce@momjian.us          219                 :            414 :     Size        segsize = 0;
                                220                 :                :     int         i;
                                221                 :                :     FixedParallelState *fps;
 2404 andres@anarazel.de        222                 :            414 :     dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
 3272 rhaas@postgresql.org      223                 :            414 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
                                224                 :            414 :     Snapshot    active_snapshot = GetActiveSnapshot();
                                225                 :                : 
                                226                 :                :     /* We might be running in a very short-lived memory context. */
                                227                 :            414 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
                                228                 :                : 
                                229                 :                :     /* Allow space to store the fixed-size parallel state. */
                                230                 :            414 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
                                231                 :            414 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
                                232                 :                : 
                                233                 :                :     /*
                                234                 :                :      * Normally, the user will have requested at least one worker process, but
                                235                 :                :      * if by chance they have not, we can skip a bunch of things here.
                                236                 :                :      */
 2404 andres@anarazel.de        237         [ +  - ]:            414 :     if (pcxt->nworkers > 0)
                                238                 :                :     {
                                239                 :                :         /* Get (or create) the per-session DSM segment's handle. */
                                240                 :            414 :         session_dsm_handle = GetSessionDsmHandle();
                                241                 :                : 
                                242                 :                :         /*
                                243                 :                :          * If we weren't able to create a per-session DSM segment, then we can
                                244                 :                :          * continue but we can't safely launch any workers because their
                                245                 :                :          * record typmods would be incompatible so they couldn't exchange
                                246                 :                :          * tuples.
                                247                 :                :          */
                                248         [ -  + ]:            414 :         if (session_dsm_handle == DSM_HANDLE_INVALID)
 2404 andres@anarazel.de        249                 :UBC           0 :             pcxt->nworkers = 0;
                                250                 :                :     }
                                251                 :                : 
 3272 rhaas@postgresql.org      252         [ +  - ]:CBC         414 :     if (pcxt->nworkers > 0)
                                253                 :                :     {
                                254                 :                :         /* Estimate space for various kinds of state sharing. */
                                255                 :            414 :         library_len = EstimateLibraryStateSpace();
                                256                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
                                257                 :            414 :         guc_len = EstimateGUCStateSpace();
                                258                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
                                259                 :            414 :         combocidlen = EstimateComboCIDStateSpace();
                                260                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
  963                           261         [ +  + ]:            414 :         if (IsolationUsesXactSnapshot())
                                262                 :                :         {
                                263                 :             11 :             tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
                                264                 :             11 :             shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
                                265                 :                :         }
 3272                           266                 :            414 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
                                267                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
                                268                 :            414 :         tstatelen = EstimateTransactionStateSpace();
                                269                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
 2404 andres@anarazel.de        270                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
 1471 noah@leadboat.com         271                 :            414 :         pendingsyncslen = EstimatePendingSyncsSpace();
                                272                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
 2277 rhaas@postgresql.org      273                 :            414 :         reindexlen = EstimateReindexStateSpace();
                                274                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
 2074 pg@bowt.ie                275                 :            414 :         relmapperlen = EstimateRelationMapSpace();
                                276                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
 1195 tmunro@postgresql.or      277                 :            414 :         uncommittedenumslen = EstimateUncommittedEnumsSpace();
                                278                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
  599 michael@paquier.xyz       279                 :            414 :         clientconninfolen = EstimateClientConnectionInfoSpace();
                                280                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
                                281                 :                :         /* If you add more chunks here, you probably need to add keys. */
                                282                 :            414 :         shm_toc_estimate_keys(&pcxt->estimator, 12);
                                283                 :                : 
                                284                 :                :         /* Estimate space need for error queues. */
                                285                 :                :         StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
                                286                 :                :                          PARALLEL_ERROR_QUEUE_SIZE,
                                287                 :                :                          "parallel error queue size not buffer-aligned");
 3272 rhaas@postgresql.org      288                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator,
                                289                 :                :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
                                290                 :                :                                         pcxt->nworkers));
                                291                 :            414 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
                                292                 :                : 
                                293                 :                :         /* Estimate how much we'll need for the entrypoint info. */
 2557 tgl@sss.pgh.pa.us         294                 :            414 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
                                295                 :                :                                strlen(pcxt->function_name) + 2);
                                296                 :            414 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
                                297                 :                :     }
                                298                 :                : 
                                299                 :                :     /*
                                300                 :                :      * Create DSM and initialize with new table of contents.  But if the user
                                301                 :                :      * didn't request any workers, then don't bother creating a dynamic shared
                                302                 :                :      * memory segment; instead, just use backend-private memory.
                                303                 :                :      *
                                304                 :                :      * Also, if we can't create a dynamic shared memory segment because the
                                305                 :                :      * maximum number of segments have already been created, then fall back to
                                306                 :                :      * backend-private memory, and plan not to use any workers.  We hope this
                                307                 :                :      * won't happen very often, but it's better to abandon the use of
                                308                 :                :      * parallelism than to fail outright.
                                309                 :                :      */
 3272 rhaas@postgresql.org      310                 :            414 :     segsize = shm_toc_estimate(&pcxt->estimator);
 2859 tgl@sss.pgh.pa.us         311         [ +  - ]:            414 :     if (pcxt->nworkers > 0)
 3272 rhaas@postgresql.org      312                 :            414 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
                                313         [ +  - ]:            414 :     if (pcxt->seg != NULL)
                                314                 :            414 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
                                315                 :                :                                    dsm_segment_address(pcxt->seg),
                                316                 :                :                                    segsize);
                                317                 :                :     else
                                318                 :                :     {
 3272 rhaas@postgresql.org      319                 :UBC           0 :         pcxt->nworkers = 0;
 3267                           320                 :              0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
                                321                 :              0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
                                322                 :                :                                    segsize);
                                323                 :                :     }
                                324                 :                : 
                                325                 :                :     /* Initialize fixed-size state in shared memory. */
                                326                 :                :     fps = (FixedParallelState *)
 3272 rhaas@postgresql.org      327                 :CBC         414 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
                                328                 :            414 :     fps->database_id = MyDatabaseId;
                                329                 :            414 :     fps->authenticated_user_id = GetAuthenticatedUserId();
 2359                           330                 :            414 :     fps->outer_user_id = GetCurrentRoleId();
  277 nathan@postgresql.or      331                 :GNC         414 :     fps->is_superuser = current_role_is_superuser;
 3272 rhaas@postgresql.org      332                 :CBC         414 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
 2866 tgl@sss.pgh.pa.us         333                 :            414 :     GetTempNamespaceState(&fps->temp_namespace_id,
                                334                 :                :                           &fps->temp_toast_namespace_id);
 1400 andres@anarazel.de        335                 :            414 :     fps->parallel_leader_pgproc = MyProc;
                                336                 :            414 :     fps->parallel_leader_pid = MyProcPid;
   42 heikki.linnakangas@i      337                 :GNC         414 :     fps->parallel_leader_proc_number = MyProcNumber;
 2017 tgl@sss.pgh.pa.us         338                 :CBC         414 :     fps->xact_ts = GetCurrentTransactionStartTimestamp();
                                339                 :            414 :     fps->stmt_ts = GetCurrentStatementStartTimestamp();
 1857 tmunro@postgresql.or      340                 :            414 :     fps->serializable_xact_handle = ShareSerializableXact();
 3272 rhaas@postgresql.org      341                 :            414 :     SpinLockInit(&fps->mutex);
                                342                 :            414 :     fps->last_xlog_end = 0;
                                343                 :            414 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
                                344                 :                : 
                                345                 :                :     /* We can skip the rest of this if we're not budgeting for any workers. */
                                346         [ +  - ]:            414 :     if (pcxt->nworkers > 0)
                                347                 :                :     {
                                348                 :                :         char       *libraryspace;
                                349                 :                :         char       *gucspace;
                                350                 :                :         char       *combocidspace;
                                351                 :                :         char       *tsnapspace;
                                352                 :                :         char       *asnapspace;
                                353                 :                :         char       *tstatespace;
                                354                 :                :         char       *pendingsyncsspace;
                                355                 :                :         char       *reindexspace;
                                356                 :                :         char       *relmapperspace;
                                357                 :                :         char       *error_queue_space;
                                358                 :                :         char       *session_dsm_handle_space;
                                359                 :                :         char       *entrypointstate;
                                360                 :                :         char       *uncommittedenumsspace;
                                361                 :                :         char       *clientconninfospace;
                                362                 :                :         Size        lnamelen;
                                363                 :                : 
                                364                 :                :         /* Serialize shared libraries we have loaded. */
                                365                 :            414 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
                                366                 :            414 :         SerializeLibraryState(library_len, libraryspace);
                                367                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
                                368                 :                : 
                                369                 :                :         /* Serialize GUC settings. */
                                370                 :            414 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
                                371                 :            414 :         SerializeGUCState(guc_len, gucspace);
                                372                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
                                373                 :                : 
                                374                 :                :         /* Serialize combo CID state. */
                                375                 :            414 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
                                376                 :            414 :         SerializeComboCIDState(combocidlen, combocidspace);
                                377                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
                                378                 :                : 
                                379                 :                :         /*
                                380                 :                :          * Serialize the transaction snapshot if the transaction isolation
                                381                 :                :          * level uses a transaction snapshot.
                                382                 :                :          */
  963                           383         [ +  + ]:            414 :         if (IsolationUsesXactSnapshot())
                                384                 :                :         {
                                385                 :             11 :             tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
                                386                 :             11 :             SerializeSnapshot(transaction_snapshot, tsnapspace);
                                387                 :             11 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
                                388                 :                :                            tsnapspace);
                                389                 :                :         }
                                390                 :                : 
                                391                 :                :         /* Serialize the active snapshot. */
 3272                           392                 :            414 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
                                393                 :            414 :         SerializeSnapshot(active_snapshot, asnapspace);
                                394                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
                                395                 :                : 
                                396                 :                :         /* Provide the handle for per-session segment. */
 2404 andres@anarazel.de        397                 :            414 :         session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
                                398                 :                :                                                     sizeof(dsm_handle));
                                399                 :            414 :         *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
                                400                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
                                401                 :                :                        session_dsm_handle_space);
                                402                 :                : 
                                403                 :                :         /* Serialize transaction state. */
 3272 rhaas@postgresql.org      404                 :            414 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
                                405                 :            414 :         SerializeTransactionState(tstatelen, tstatespace);
                                406                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
                                407                 :                : 
                                408                 :                :         /* Serialize pending syncs. */
 1471 noah@leadboat.com         409                 :            414 :         pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
                                410                 :            414 :         SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
                                411                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
                                412                 :                :                        pendingsyncsspace);
                                413                 :                : 
                                414                 :                :         /* Serialize reindex state. */
 2277 rhaas@postgresql.org      415                 :            414 :         reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
                                416                 :            414 :         SerializeReindexState(reindexlen, reindexspace);
                                417                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
                                418                 :                : 
                                419                 :                :         /* Serialize relmapper state. */
 2074 pg@bowt.ie                420                 :            414 :         relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
                                421                 :            414 :         SerializeRelationMap(relmapperlen, relmapperspace);
                                422                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
                                423                 :                :                        relmapperspace);
                                424                 :                : 
                                425                 :                :         /* Serialize uncommitted enum state. */
 1195 tmunro@postgresql.or      426                 :            414 :         uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
                                427                 :                :                                                  uncommittedenumslen);
                                428                 :            414 :         SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
                                429                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
                                430                 :                :                        uncommittedenumsspace);
                                431                 :                : 
                                432                 :                :         /* Serialize our ClientConnectionInfo. */
  599 michael@paquier.xyz       433                 :            414 :         clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
                                434                 :            414 :         SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
                                435                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
                                436                 :                :                        clientconninfospace);
                                437                 :                : 
                                438                 :                :         /* Allocate space for worker information. */
 3272 rhaas@postgresql.org      439                 :            414 :         pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
                                440                 :                : 
                                441                 :                :         /*
                                442                 :                :          * Establish error queues in dynamic shared memory.
                                443                 :                :          *
                                444                 :                :          * These queues should be used only for transmitting ErrorResponse,
                                445                 :                :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
                                446                 :                :          * should be transmitted via separate (possibly larger?) queues.
                                447                 :                :          */
                                448                 :                :         error_queue_space =
 3249 bruce@momjian.us          449                 :            414 :             shm_toc_allocate(pcxt->toc,
                                450                 :                :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
 2900 rhaas@postgresql.org      451                 :            414 :                                       pcxt->nworkers));
 3272                           452         [ +  + ]:           1364 :         for (i = 0; i < pcxt->nworkers; ++i)
                                453                 :                :         {
                                454                 :                :             char       *start;
                                455                 :                :             shm_mq     *mq;
                                456                 :                : 
                                457                 :            950 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
                                458                 :            950 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
                                459                 :            950 :             shm_mq_set_receiver(mq, MyProc);
                                460                 :            950 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
                                461                 :                :         }
                                462                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
                                463                 :                : 
                                464                 :                :         /*
                                465                 :                :          * Serialize entrypoint information.  It's unsafe to pass function
                                466                 :                :          * pointers across processes, as the function pointer may be different
                                467                 :                :          * in each process in EXEC_BACKEND builds, so we always pass library
                                468                 :                :          * and function name.  (We use library name "postgres" for functions
                                469                 :                :          * in the core backend.)
                                470                 :                :          */
 2557 tgl@sss.pgh.pa.us         471                 :            414 :         lnamelen = strlen(pcxt->library_name);
                                472                 :            414 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
                                473                 :            414 :                                            strlen(pcxt->function_name) + 2);
                                474                 :            414 :         strcpy(entrypointstate, pcxt->library_name);
                                475                 :            414 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
                                476                 :            414 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
                                477                 :                :     }
                                478                 :                : 
                                479                 :                :     /* Restore previous memory context. */
 3272 rhaas@postgresql.org      480                 :            414 :     MemoryContextSwitchTo(oldcontext);
                                481                 :            414 : }
                                482                 :                : 
                                483                 :                : /*
                                484                 :                :  * Reinitialize the dynamic shared memory segment for a parallel context such
                                485                 :                :  * that we could launch workers for it again.
                                486                 :                :  */
                                487                 :                : void
 3089                           488                 :            130 : ReinitializeParallelDSM(ParallelContext *pcxt)
                                489                 :                : {
                                490                 :                :     FixedParallelState *fps;
                                491                 :                : 
                                492                 :                :     /* Wait for any old workers to exit. */
 2859 tgl@sss.pgh.pa.us         493         [ +  - ]:            130 :     if (pcxt->nworkers_launched > 0)
                                494                 :                :     {
                                495                 :            130 :         WaitForParallelWorkersToFinish(pcxt);
                                496                 :            130 :         WaitForParallelWorkersToExit(pcxt);
                                497                 :            130 :         pcxt->nworkers_launched = 0;
 2263 rhaas@postgresql.org      498         [ +  - ]:            130 :         if (pcxt->known_attached_workers)
                                499                 :                :         {
                                500                 :            130 :             pfree(pcxt->known_attached_workers);
                                501                 :            130 :             pcxt->known_attached_workers = NULL;
                                502                 :            130 :             pcxt->nknown_attached_workers = 0;
                                503                 :                :         }
                                504                 :                :     }
                                505                 :                : 
                                506                 :                :     /* Reset a few bits of fixed parallel state to a clean state. */
 2505 tgl@sss.pgh.pa.us         507                 :            130 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
 3089 rhaas@postgresql.org      508                 :            130 :     fps->last_xlog_end = 0;
                                509                 :                : 
                                510                 :                :     /* Recreate error queues (if they exist). */
 2263 tgl@sss.pgh.pa.us         511         [ +  - ]:            130 :     if (pcxt->nworkers > 0)
                                512                 :                :     {
                                513                 :                :         char       *error_queue_space;
                                514                 :                :         int         i;
                                515                 :                : 
                                516                 :                :         error_queue_space =
                                517                 :            130 :             shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
                                518         [ +  + ]:            543 :         for (i = 0; i < pcxt->nworkers; ++i)
                                519                 :                :         {
                                520                 :                :             char       *start;
                                521                 :                :             shm_mq     *mq;
                                522                 :                : 
                                523                 :            413 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
                                524                 :            413 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
                                525                 :            413 :             shm_mq_set_receiver(mq, MyProc);
                                526                 :            413 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
                                527                 :                :         }
                                528                 :                :     }
 3089 rhaas@postgresql.org      529                 :            130 : }
                                530                 :                : 
                                531                 :                : /*
                                532                 :                :  * Reinitialize parallel workers for a parallel context such that we could
                                533                 :                :  * launch a different number of workers.  This is required for cases where
                                534                 :                :  * we need to reuse the same DSM segment, but the number of workers can
                                535                 :                :  * vary from run-to-run.
                                536                 :                :  */
                                537                 :                : void
 1546 akapila@postgresql.o      538                 :             10 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
                                539                 :                : {
                                540                 :                :     /*
                                541                 :                :      * The number of workers that need to be launched must be less than the
                                542                 :                :      * number of workers with which the parallel context is initialized.
                                543                 :                :      */
                                544         [ -  + ]:             10 :     Assert(pcxt->nworkers >= nworkers_to_launch);
                                545                 :             10 :     pcxt->nworkers_to_launch = nworkers_to_launch;
                                546                 :             10 : }
                                547                 :                : 
                                548                 :                : /*
                                549                 :                :  * Launch parallel workers.
                                550                 :                :  */
                                551                 :                : void
 3272 rhaas@postgresql.org      552                 :            544 : LaunchParallelWorkers(ParallelContext *pcxt)
                                553                 :                : {
                                554                 :                :     MemoryContext oldcontext;
                                555                 :                :     BackgroundWorker worker;
                                556                 :                :     int         i;
 3249 bruce@momjian.us          557                 :            544 :     bool        any_registrations_failed = false;
                                558                 :                : 
                                559                 :                :     /* Skip this if we have no workers. */
 1546 akapila@postgresql.o      560   [ +  -  -  + ]:            544 :     if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
 3272 rhaas@postgresql.org      561                 :UBC           0 :         return;
                                562                 :                : 
                                563                 :                :     /* We need to be a lock group leader. */
 2989 rhaas@postgresql.org      564                 :CBC         544 :     BecomeLockGroupLeader();
                                565                 :                : 
                                566                 :                :     /* If we do have workers, we'd better have a DSM segment. */
 3272                           567         [ -  + ]:            544 :     Assert(pcxt->seg != NULL);
                                568                 :                : 
                                569                 :                :     /* We might be running in a short-lived memory context. */
                                570                 :            544 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
                                571                 :                : 
                                572                 :                :     /* Configure a worker. */
 2555 tgl@sss.pgh.pa.us         573                 :            544 :     memset(&worker, 0, sizeof(worker));
 3272 rhaas@postgresql.org      574                 :            544 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
                                575                 :                :              MyProcPid);
 2418 peter_e@gmx.net           576                 :            544 :     snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
 3272 rhaas@postgresql.org      577                 :            544 :     worker.bgw_flags =
                                578                 :                :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
                                579                 :                :         | BGWORKER_CLASS_PARALLEL;
                                580                 :            544 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
                                581                 :            544 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
 2571                           582                 :            544 :     sprintf(worker.bgw_library_name, "postgres");
                                583                 :            544 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
 3272                           584                 :            544 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
                                585                 :            544 :     worker.bgw_notify_pid = MyProcPid;
                                586                 :                : 
                                587                 :                :     /*
                                588                 :                :      * Start workers.
                                589                 :                :      *
                                590                 :                :      * The caller must be able to tolerate ending up with fewer workers than
                                591                 :                :      * expected, so there is no need to throw an error here if registration
                                592                 :                :      * fails.  It wouldn't help much anyway, because registering the worker in
                                593                 :                :      * no way guarantees that it will start up and initialize successfully.
                                594                 :                :      */
 1546 akapila@postgresql.o      595         [ +  + ]:           1906 :     for (i = 0; i < pcxt->nworkers_to_launch; ++i)
                                596                 :                :     {
 3083 rhaas@postgresql.org      597                 :           1362 :         memcpy(worker.bgw_extra, &i, sizeof(int));
 3272                           598   [ +  +  +  + ]:           2695 :         if (!any_registrations_failed &&
                                599                 :           1333 :             RegisterDynamicBackgroundWorker(&worker,
                                600                 :           1333 :                                             &pcxt->worker[i].bgwhandle))
                                601                 :                :         {
                                602                 :           1322 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
                                603                 :           1322 :                               pcxt->worker[i].bgwhandle);
 3103                           604                 :           1322 :             pcxt->nworkers_launched++;
                                605                 :                :         }
                                606                 :                :         else
                                607                 :                :         {
                                608                 :                :             /*
                                609                 :                :              * If we weren't able to register the worker, then we've bumped up
                                610                 :                :              * against the max_worker_processes limit, and future
                                611                 :                :              * registrations will probably fail too, so arrange to skip them.
                                612                 :                :              * But we still have to execute this code for the remaining slots
                                613                 :                :              * to make sure that we forget about the error queues we budgeted
                                614                 :                :              * for those workers.  Otherwise, we'll wait for them to start,
                                615                 :                :              * but they never will.
                                616                 :                :              */
 3272                           617                 :             40 :             any_registrations_failed = true;
                                618                 :             40 :             pcxt->worker[i].bgwhandle = NULL;
 2418 tgl@sss.pgh.pa.us         619                 :             40 :             shm_mq_detach(pcxt->worker[i].error_mqh);
 3272 rhaas@postgresql.org      620                 :             40 :             pcxt->worker[i].error_mqh = NULL;
                                621                 :                :         }
                                622                 :                :     }
                                623                 :                : 
                                624                 :                :     /*
                                625                 :                :      * Now that nworkers_launched has taken its final value, we can initialize
                                626                 :                :      * known_attached_workers.
                                627                 :                :      */
 2273                           628         [ +  + ]:            544 :     if (pcxt->nworkers_launched > 0)
                                629                 :                :     {
 2263                           630                 :            534 :         pcxt->known_attached_workers =
 2273                           631                 :            534 :             palloc0(sizeof(bool) * pcxt->nworkers_launched);
 2263                           632                 :            534 :         pcxt->nknown_attached_workers = 0;
                                633                 :                :     }
                                634                 :                : 
                                635                 :                :     /* Restore previous memory context. */
 3272                           636                 :            544 :     MemoryContextSwitchTo(oldcontext);
                                637                 :                : }
                                638                 :                : 
                                639                 :                : /*
                                640                 :                :  * Wait for all workers to attach to their error queues, and throw an error if
                                641                 :                :  * any worker fails to do this.
                                642                 :                :  *
                                643                 :                :  * Callers can assume that if this function returns successfully, then the
                                644                 :                :  * number of workers given by pcxt->nworkers_launched have initialized and
                                645                 :                :  * attached to their error queues.  Whether or not these workers are guaranteed
                                646                 :                :  * to still be running depends on what code the caller asked them to run;
                                647                 :                :  * this function does not guarantee that they have not exited.  However, it
                                648                 :                :  * does guarantee that any workers which exited must have done so cleanly and
                                649                 :                :  * after successfully performing the work with which they were tasked.
                                650                 :                :  *
                                651                 :                :  * If this function is not called, then some of the workers that were launched
                                652                 :                :  * may not have been started due to a fork() failure, or may have exited during
                                653                 :                :  * early startup prior to attaching to the error queue, so nworkers_launched
                                654                 :                :  * cannot be viewed as completely reliable.  It will never be less than the
                                655                 :                :  * number of workers which actually started, but it might be more.  Any workers
                                656                 :                :  * that failed to start will still be discovered by
                                657                 :                :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
                                658                 :                :  * provided that function is eventually reached.
                                659                 :                :  *
                                660                 :                :  * In general, the leader process should do as much work as possible before
                                661                 :                :  * calling this function.  fork() failures and other early-startup failures
                                662                 :                :  * are very uncommon, and having the leader sit idle when it could be doing
                                663                 :                :  * useful work is undesirable.  However, if the leader needs to wait for
                                664                 :                :  * all of its workers or for a specific worker, it may want to call this
                                665                 :                :  * function before doing so.  If not, it must make some other provision for
                                666                 :                :  * the failure-to-start case, lest it wait forever.  On the other hand, a
                                667                 :                :  * leader which never waits for a worker that might not be started yet, or
                                668                 :                :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
                                669                 :                :  * call this function at all.
                                670                 :                :  */
                                671                 :                : void
 2263                           672                 :             72 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
                                673                 :                : {
                                674                 :                :     int         i;
                                675                 :                : 
                                676                 :                :     /* Skip this if we have no launched workers. */
                                677         [ -  + ]:             72 :     if (pcxt->nworkers_launched == 0)
 2263 rhaas@postgresql.org      678                 :UBC           0 :         return;
                                679                 :                : 
                                680                 :                :     for (;;)
                                681                 :                :     {
                                682                 :                :         /*
                                683                 :                :          * This will process any parallel messages that are pending and it may
                                684                 :                :          * also throw an error propagated from a worker.
                                685                 :                :          */
 2263 rhaas@postgresql.org      686         [ +  + ]:CBC      324899 :         CHECK_FOR_INTERRUPTS();
                                687                 :                : 
                                688         [ +  + ]:         649800 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
                                689                 :                :         {
                                690                 :                :             BgwHandleStatus status;
                                691                 :                :             shm_mq     *mq;
                                692                 :                :             int         rc;
                                693                 :                :             pid_t       pid;
                                694                 :                : 
                                695         [ +  + ]:         324901 :             if (pcxt->known_attached_workers[i])
                                696                 :             15 :                 continue;
                                697                 :                : 
                                698                 :                :             /*
                                699                 :                :              * If error_mqh is NULL, then the worker has already exited
                                700                 :                :              * cleanly.
                                701                 :                :              */
                                702         [ -  + ]:         324886 :             if (pcxt->worker[i].error_mqh == NULL)
                                703                 :                :             {
 2263 rhaas@postgresql.org      704                 :UBC           0 :                 pcxt->known_attached_workers[i] = true;
                                705                 :              0 :                 ++pcxt->nknown_attached_workers;
                                706                 :              0 :                 continue;
                                707                 :                :             }
                                708                 :                : 
 2263 rhaas@postgresql.org      709                 :CBC      324886 :             status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
                                710         [ +  + ]:         324886 :             if (status == BGWH_STARTED)
                                711                 :                :             {
                                712                 :                :                 /* Has the worker attached to the error queue? */
                                713                 :         324845 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
                                714         [ +  + ]:         324845 :                 if (shm_mq_get_sender(mq) != NULL)
                                715                 :                :                 {
                                716                 :                :                     /* Yes, so it is known to be attached. */
                                717                 :             59 :                     pcxt->known_attached_workers[i] = true;
                                718                 :             59 :                     ++pcxt->nknown_attached_workers;
                                719                 :                :                 }
                                720                 :                :             }
                                721         [ -  + ]:             41 :             else if (status == BGWH_STOPPED)
                                722                 :                :             {
                                723                 :                :                 /*
                                724                 :                :                  * If the worker stopped without attaching to the error queue,
                                725                 :                :                  * throw an error.
                                726                 :                :                  */
 2263 rhaas@postgresql.org      727                 :UBC           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
                                728         [ #  # ]:              0 :                 if (shm_mq_get_sender(mq) == NULL)
                                729         [ #  # ]:              0 :                     ereport(ERROR,
                                730                 :                :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                731                 :                :                              errmsg("parallel worker failed to initialize"),
                                732                 :                :                              errhint("More details may be available in the server log.")));
                                733                 :                : 
                                734                 :              0 :                 pcxt->known_attached_workers[i] = true;
                                735                 :              0 :                 ++pcxt->nknown_attached_workers;
                                736                 :                :             }
                                737                 :                :             else
                                738                 :                :             {
                                739                 :                :                 /*
                                740                 :                :                  * Worker not yet started, so we must wait.  The postmaster
                                741                 :                :                  * will notify us if the worker's state changes.  Our latch
                                742                 :                :                  * might also get set for some other reason, but if so we'll
                                743                 :                :                  * just end up waiting for the same worker again.
                                744                 :                :                  */
 2263 rhaas@postgresql.org      745                 :CBC          41 :                 rc = WaitLatch(MyLatch,
                                746                 :                :                                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
                                747                 :                :                                -1, WAIT_EVENT_BGWORKER_STARTUP);
                                748                 :                : 
                                749         [ +  - ]:             41 :                 if (rc & WL_LATCH_SET)
                                750                 :             41 :                     ResetLatch(MyLatch);
                                751                 :                :             }
                                752                 :                :         }
                                753                 :                : 
                                754                 :                :         /* If all workers are known to have started, we're done. */
                                755         [ +  + ]:         324899 :         if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
                                756                 :                :         {
                                757         [ -  + ]:             72 :             Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
                                758                 :             72 :             break;
                                759                 :                :         }
                                760                 :                :     }
                                761                 :                : }
                                762                 :                : 
                                763                 :                : /*
                                764                 :                :  * Wait for all workers to finish computing.
                                765                 :                :  *
                                766                 :                :  * Even if the parallel operation seems to have completed successfully, it's
                                767                 :                :  * important to call this function afterwards.  We must not miss any errors
                                768                 :                :  * the workers may have thrown during the parallel operation, or any that they
                                769                 :                :  * may yet throw while shutting down.
                                770                 :                :  *
                                771                 :                :  * Also, we want to update our notion of XactLastRecEnd based on worker
                                772                 :                :  * feedback.
                                773                 :                :  */
                                774                 :                : void
 3272                           775                 :            671 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                                776                 :                : {
                                777                 :                :     for (;;)
                                778                 :            302 :     {
 3249 bruce@momjian.us          779                 :            973 :         bool        anyone_alive = false;
 2273 rhaas@postgresql.org      780                 :            973 :         int         nfinished = 0;
                                781                 :                :         int         i;
                                782                 :                : 
                                783                 :                :         /*
                                784                 :                :          * This will process any parallel messages that are pending, which may
                                785                 :                :          * change the outcome of the loop that follows.  It may also throw an
                                786                 :                :          * error propagated from a worker.
                                787                 :                :          */
 3272                           788         [ +  + ]:            973 :         CHECK_FOR_INTERRUPTS();
                                789                 :                : 
 2963                           790         [ +  + ]:           3518 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
                                791                 :                :         {
                                792                 :                :             /*
                                793                 :                :              * If error_mqh is NULL, then the worker has already exited
                                794                 :                :              * cleanly.  If we have received a message through error_mqh from
                                795                 :                :              * the worker, we know it started up cleanly, and therefore we're
                                796                 :                :              * certain to be notified when it exits.
                                797                 :                :              */
 2273                           798         [ +  + ]:           2550 :             if (pcxt->worker[i].error_mqh == NULL)
                                799                 :           2103 :                 ++nfinished;
 2263                           800         [ +  + ]:            447 :             else if (pcxt->known_attached_workers[i])
                                801                 :                :             {
 3272                           802                 :              5 :                 anyone_alive = true;
                                803                 :              5 :                 break;
                                804                 :                :             }
                                805                 :                :         }
                                806                 :                : 
                                807         [ +  + ]:            973 :         if (!anyone_alive)
                                808                 :                :         {
                                809                 :                :             /* If all workers are known to have finished, we're done. */
 2273                           810         [ +  + ]:            968 :             if (nfinished >= pcxt->nworkers_launched)
                                811                 :                :             {
                                812         [ -  + ]:            671 :                 Assert(nfinished == pcxt->nworkers_launched);
                                813                 :            671 :                 break;
                                814                 :                :             }
                                815                 :                : 
                                816                 :                :             /*
                                817                 :                :              * We didn't detect any living workers, but not all workers are
                                818                 :                :              * known to have exited cleanly.  Either not all workers have
                                819                 :                :              * launched yet, or maybe some of them failed to start or
                                820                 :                :              * terminated abnormally.
                                821                 :                :              */
                                822         [ +  + ]:           1110 :             for (i = 0; i < pcxt->nworkers_launched; ++i)
                                823                 :                :             {
                                824                 :                :                 pid_t       pid;
                                825                 :                :                 shm_mq     *mq;
                                826                 :                : 
                                827                 :                :                 /*
                                828                 :                :                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
                                829                 :                :                  * should just keep waiting.  If it is BGWH_STOPPED, then
                                830                 :                :                  * further investigation is needed.
                                831                 :                :                  */
                                832         [ +  + ]:            813 :                 if (pcxt->worker[i].error_mqh == NULL ||
                                833   [ +  -  +  - ]:            884 :                     pcxt->worker[i].bgwhandle == NULL ||
                                834                 :            442 :                     GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
                                835                 :                :                                            &pid) != BGWH_STOPPED)
                                836                 :            813 :                     continue;
                                837                 :                : 
                                838                 :                :                 /*
                                839                 :                :                  * Check whether the worker ended up stopped without ever
                                840                 :                :                  * attaching to the error queue.  If so, the postmaster was
                                841                 :                :                  * unable to fork the worker or it exited without initializing
                                842                 :                :                  * properly.  We must throw an error, since the caller may
                                843                 :                :                  * have been expecting the worker to do some work before
                                844                 :                :                  * exiting.
                                845                 :                :                  */
 2273 rhaas@postgresql.org      846                 :UBC           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
                                847         [ #  # ]:              0 :                 if (shm_mq_get_sender(mq) == NULL)
                                848         [ #  # ]:              0 :                     ereport(ERROR,
                                849                 :                :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                850                 :                :                              errmsg("parallel worker failed to initialize"),
                                851                 :                :                              errhint("More details may be available in the server log.")));
                                852                 :                : 
                                853                 :                :                 /*
                                854                 :                :                  * The worker is stopped, but is attached to the error queue.
                                855                 :                :                  * Unless there's a bug somewhere, this will only happen when
                                856                 :                :                  * the worker writes messages and terminates after the
                                857                 :                :                  * CHECK_FOR_INTERRUPTS() near the top of this function and
                                858                 :                :                  * before the call to GetBackgroundWorkerPid().  In that case,
                                859                 :                :                  * or latch should have been set as well and the right things
                                860                 :                :                  * will happen on the next pass through the loop.
                                861                 :                :                  */
                                862                 :                :             }
                                863                 :                :         }
                                864                 :                : 
 1969 tmunro@postgresql.or      865                 :CBC         302 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
                                866                 :                :                          WAIT_EVENT_PARALLEL_FINISH);
 2504 andres@anarazel.de        867                 :            302 :         ResetLatch(MyLatch);
                                868                 :                :     }
                                869                 :                : 
 3272 rhaas@postgresql.org      870         [ +  - ]:            671 :     if (pcxt->toc != NULL)
                                871                 :                :     {
                                872                 :                :         FixedParallelState *fps;
                                873                 :                : 
 2505 tgl@sss.pgh.pa.us         874                 :            671 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
 3272 rhaas@postgresql.org      875         [ +  + ]:            671 :         if (fps->last_xlog_end > XactLastRecEnd)
                                876                 :             14 :             XactLastRecEnd = fps->last_xlog_end;
                                877                 :                :     }
                                878                 :            671 : }
                                879                 :                : 
                                880                 :                : /*
                                881                 :                :  * Wait for all workers to exit.
                                882                 :                :  *
                                883                 :                :  * This function ensures that workers have been completely shutdown.  The
                                884                 :                :  * difference between WaitForParallelWorkersToFinish and this function is
                                885                 :                :  * that the former just ensures that last message sent by a worker backend is
                                886                 :                :  * received by the leader backend whereas this ensures the complete shutdown.
                                887                 :                :  */
                                888                 :                : static void
 3089                           889                 :            544 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
                                890                 :                : {
                                891                 :                :     int         i;
                                892                 :                : 
                                893                 :                :     /* Wait until the workers actually die. */
 2963                           894         [ +  + ]:           1866 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
                                895                 :                :     {
                                896                 :                :         BgwHandleStatus status;
                                897                 :                : 
 3089                           898   [ +  -  -  + ]:           1322 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
 3089 rhaas@postgresql.org      899                 :UBC           0 :             continue;
                                900                 :                : 
 3089 rhaas@postgresql.org      901                 :CBC        1322 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
                                902                 :                : 
                                903                 :                :         /*
                                904                 :                :          * If the postmaster kicked the bucket, we have no chance of cleaning
                                905                 :                :          * up safely -- we won't be able to tell when our workers are actually
                                906                 :                :          * dead.  This doesn't necessitate a PANIC since they will all abort
                                907                 :                :          * eventually, but we can't safely continue this session.
                                908                 :                :          */
                                909         [ -  + ]:           1322 :         if (status == BGWH_POSTMASTER_DIED)
 3089 rhaas@postgresql.org      910         [ #  # ]:UBC           0 :             ereport(FATAL,
                                911                 :                :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
                                912                 :                :                      errmsg("postmaster exited during a parallel transaction")));
                                913                 :                : 
                                914                 :                :         /* Release memory. */
 3089 rhaas@postgresql.org      915                 :CBC        1322 :         pfree(pcxt->worker[i].bgwhandle);
                                916                 :           1322 :         pcxt->worker[i].bgwhandle = NULL;
                                917                 :                :     }
                                918                 :            544 : }
                                919                 :                : 
                                920                 :                : /*
                                921                 :                :  * Destroy a parallel context.
                                922                 :                :  *
                                923                 :                :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
                                924                 :                :  * first, before calling this function.  When this function is invoked, any
                                925                 :                :  * remaining workers are forcibly killed; the dynamic shared memory segment
                                926                 :                :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
                                927                 :                :  */
                                928                 :                : void
 3272                           929                 :            414 : DestroyParallelContext(ParallelContext *pcxt)
                                930                 :                : {
                                931                 :                :     int         i;
                                932                 :                : 
                                933                 :                :     /*
                                934                 :                :      * Be careful about order of operations here!  We remove the parallel
                                935                 :                :      * context from the list before we do anything else; otherwise, if an
                                936                 :                :      * error occurs during a subsequent step, we might try to nuke it again
                                937                 :                :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
                                938                 :                :      */
                                939                 :            414 :     dlist_delete(&pcxt->node);
                                940                 :                : 
                                941                 :                :     /* Kill each worker in turn, and forget their error queues. */
 3119                           942         [ +  - ]:            414 :     if (pcxt->worker != NULL)
                                943                 :                :     {
 2963                           944         [ +  + ]:           1323 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
                                945                 :                :         {
 3119                           946         [ +  + ]:            909 :             if (pcxt->worker[i].error_mqh != NULL)
                                947                 :                :             {
 3089                           948                 :              3 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
                                949                 :                : 
 2418 tgl@sss.pgh.pa.us         950                 :              3 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
 3119 rhaas@postgresql.org      951                 :              3 :                 pcxt->worker[i].error_mqh = NULL;
                                952                 :                :             }
                                953                 :                :         }
                                954                 :                :     }
                                955                 :                : 
                                956                 :                :     /*
                                957                 :                :      * If we have allocated a shared memory segment, detach it.  This will
                                958                 :                :      * implicitly detach the error queues, and any other shared memory queues,
                                959                 :                :      * stored there.
                                960                 :                :      */
 3272                           961         [ +  - ]:            414 :     if (pcxt->seg != NULL)
                                962                 :                :     {
                                963                 :            414 :         dsm_detach(pcxt->seg);
                                964                 :            414 :         pcxt->seg = NULL;
                                965                 :                :     }
                                966                 :                : 
                                967                 :                :     /*
                                968                 :                :      * If this parallel context is actually in backend-private memory rather
                                969                 :                :      * than shared memory, free that memory instead.
                                970                 :                :      */
 3267                           971         [ -  + ]:            414 :     if (pcxt->private_memory != NULL)
                                972                 :                :     {
 3267 rhaas@postgresql.org      973                 :UBC           0 :         pfree(pcxt->private_memory);
                                974                 :              0 :         pcxt->private_memory = NULL;
                                975                 :                :     }
                                976                 :                : 
                                977                 :                :     /*
                                978                 :                :      * We can't finish transaction commit or abort until all of the workers
                                979                 :                :      * have exited.  This means, in particular, that we can't respond to
                                980                 :                :      * interrupts at this stage.
                                981                 :                :      */
 3089 rhaas@postgresql.org      982                 :CBC         414 :     HOLD_INTERRUPTS();
                                983                 :            414 :     WaitForParallelWorkersToExit(pcxt);
                                984         [ -  + ]:            414 :     RESUME_INTERRUPTS();
                                985                 :                : 
                                986                 :                :     /* Free the worker array itself. */
 3272                           987         [ +  - ]:            414 :     if (pcxt->worker != NULL)
                                988                 :                :     {
                                989                 :            414 :         pfree(pcxt->worker);
                                990                 :            414 :         pcxt->worker = NULL;
                                991                 :                :     }
                                992                 :                : 
                                993                 :                :     /* Free memory. */
 2557 tgl@sss.pgh.pa.us         994                 :            414 :     pfree(pcxt->library_name);
                                995                 :            414 :     pfree(pcxt->function_name);
 3272 rhaas@postgresql.org      996                 :            414 :     pfree(pcxt);
                                997                 :            414 : }
                                998                 :                : 
                                999                 :                : /*
                               1000                 :                :  * Are there any parallel contexts currently active?
                               1001                 :                :  */
                               1002                 :                : bool
                               1003                 :         429210 : ParallelContextActive(void)
                               1004                 :                : {
                               1005                 :         429210 :     return !dlist_is_empty(&pcxt_list);
                               1006                 :                : }
                               1007                 :                : 
                               1008                 :                : /*
                               1009                 :                :  * Handle receipt of an interrupt indicating a parallel worker message.
                               1010                 :                :  *
                               1011                 :                :  * Note: this is called within a signal handler!  All we can do is set
                               1012                 :                :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
                               1013                 :                :  * HandleParallelMessages().
                               1014                 :                :  */
                               1015                 :                : void
                               1016                 :           1607 : HandleParallelMessageInterrupt(void)
                               1017                 :                : {
                               1018                 :           1607 :     InterruptPending = true;
                               1019                 :           1607 :     ParallelMessagePending = true;
                               1020                 :           1607 :     SetLatch(MyLatch);
                               1021                 :           1607 : }
                               1022                 :                : 
                               1023                 :                : /*
                               1024                 :                :  * Handle any queued protocol messages received from parallel workers.
                               1025                 :                :  */
                               1026                 :                : void
                               1027                 :           1294 : HandleParallelMessages(void)
                               1028                 :                : {
                               1029                 :                :     dlist_iter  iter;
                               1030                 :                :     MemoryContext oldcontext;
                               1031                 :                : 
                               1032                 :                :     static MemoryContext hpm_context = NULL;
                               1033                 :                : 
                               1034                 :                :     /*
                               1035                 :                :      * This is invoked from ProcessInterrupts(), and since some of the
                               1036                 :                :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
                               1037                 :                :      * for recursive calls if more signals are received while this runs.  It's
                               1038                 :                :      * unclear that recursive entry would be safe, and it doesn't seem useful
                               1039                 :                :      * even if it is safe, so let's block interrupts until done.
                               1040                 :                :      */
 2812 tgl@sss.pgh.pa.us        1041                 :           1294 :     HOLD_INTERRUPTS();
                               1042                 :                : 
                               1043                 :                :     /*
                               1044                 :                :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
                               1045                 :                :      * don't want to risk leaking data into long-lived contexts, so let's do
                               1046                 :                :      * our work here in a private context that we can reset on each use.
                               1047                 :                :      */
 2788                          1048         [ +  + ]:           1294 :     if (hpm_context == NULL)    /* first time through? */
                               1049                 :             59 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
                               1050                 :                :                                             "HandleParallelMessages",
                               1051                 :                :                                             ALLOCSET_DEFAULT_SIZES);
                               1052                 :                :     else
                               1053                 :           1235 :         MemoryContextReset(hpm_context);
                               1054                 :                : 
                               1055                 :           1294 :     oldcontext = MemoryContextSwitchTo(hpm_context);
                               1056                 :                : 
                               1057                 :                :     /* OK to process messages.  Reset the flag saying there are more to do. */
 3272 rhaas@postgresql.org     1058                 :           1294 :     ParallelMessagePending = false;
                               1059                 :                : 
                               1060   [ +  -  +  + ]:           2601 :     dlist_foreach(iter, &pcxt_list)
                               1061                 :                :     {
                               1062                 :                :         ParallelContext *pcxt;
                               1063                 :                :         int         i;
                               1064                 :                : 
                               1065                 :           1310 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
                               1066         [ -  + ]:           1310 :         if (pcxt->worker == NULL)
 3272 rhaas@postgresql.org     1067                 :UBC           0 :             continue;
                               1068                 :                : 
 2963 rhaas@postgresql.org     1069         [ +  + ]:CBC        5193 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
                               1070                 :                :         {
                               1071                 :                :             /*
                               1072                 :                :              * Read as many messages as we can from each worker, but stop when
                               1073                 :                :              * either (1) the worker's error queue goes away, which can happen
                               1074                 :                :              * if we receive a Terminate message from the worker; or (2) no
                               1075                 :                :              * more messages can be read from the worker without blocking.
                               1076                 :                :              */
 3272                          1077         [ +  + ]:           5207 :             while (pcxt->worker[i].error_mqh != NULL)
                               1078                 :                :             {
                               1079                 :                :                 shm_mq_result res;
                               1080                 :                :                 Size        nbytes;
                               1081                 :                :                 void       *data;
                               1082                 :                : 
                               1083                 :           2353 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
                               1084                 :                :                                      &data, true);
                               1085         [ +  + ]:           2353 :                 if (res == SHM_MQ_WOULD_BLOCK)
                               1086                 :           1029 :                     break;
                               1087         [ +  - ]:           1324 :                 else if (res == SHM_MQ_SUCCESS)
                               1088                 :                :                 {
                               1089                 :                :                     StringInfoData msg;
                               1090                 :                : 
                               1091                 :           1324 :                     initStringInfo(&msg);
                               1092                 :           1324 :                     appendBinaryStringInfo(&msg, data, nbytes);
                               1093                 :           1324 :                     HandleParallelMessage(pcxt, i, &msg);
                               1094                 :           1321 :                     pfree(msg.data);
                               1095                 :                :                 }
                               1096                 :                :                 else
 3272 rhaas@postgresql.org     1097         [ #  # ]:UBC           0 :                     ereport(ERROR,
                               1098                 :                :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1099                 :                :                              errmsg("lost connection to parallel worker")));
                               1100                 :                :             }
                               1101                 :                :         }
                               1102                 :                :     }
                               1103                 :                : 
 2788 tgl@sss.pgh.pa.us        1104                 :CBC        1291 :     MemoryContextSwitchTo(oldcontext);
                               1105                 :                : 
                               1106                 :                :     /* Might as well clear the context on our way out */
                               1107                 :           1291 :     MemoryContextReset(hpm_context);
                               1108                 :                : 
 2812                          1109         [ -  + ]:           1291 :     RESUME_INTERRUPTS();
 3272 rhaas@postgresql.org     1110                 :           1291 : }
                               1111                 :                : 
                               1112                 :                : /*
                               1113                 :                :  * Handle a single protocol message received from a single parallel worker.
                               1114                 :                :  */
                               1115                 :                : static void
                               1116                 :           1324 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
                               1117                 :                : {
                               1118                 :                :     char        msgtype;
                               1119                 :                : 
 2263                          1120         [ +  - ]:           1324 :     if (pcxt->known_attached_workers != NULL &&
                               1121         [ +  + ]:           1324 :         !pcxt->known_attached_workers[i])
                               1122                 :                :     {
                               1123                 :           1263 :         pcxt->known_attached_workers[i] = true;
                               1124                 :           1263 :         pcxt->nknown_attached_workers++;
                               1125                 :                :     }
                               1126                 :                : 
 3272                          1127                 :           1324 :     msgtype = pq_getmsgbyte(msg);
                               1128                 :                : 
                               1129   [ +  -  +  +  :           1324 :     switch (msgtype)
                                                 - ]
                               1130                 :                :     {
  236 nathan@postgresql.or     1131                 :GNC           3 :         case PqMsg_ErrorResponse:
                               1132                 :                :         case PqMsg_NoticeResponse:
                               1133                 :                :             {
                               1134                 :                :                 ErrorData   edata;
                               1135                 :                :                 ErrorContextCallback *save_error_context_stack;
                               1136                 :                : 
                               1137                 :                :                 /* Parse ErrorResponse or NoticeResponse. */
 3272 rhaas@postgresql.org     1138                 :CBC           3 :                 pq_parse_errornotice(msg, &edata);
                               1139                 :                : 
                               1140                 :                :                 /* Death of a worker isn't enough justification for suicide. */
                               1141                 :              3 :                 edata.elevel = Min(edata.elevel, ERROR);
                               1142                 :                : 
                               1143                 :                :                 /*
                               1144                 :                :                  * If desired, add a context line to show that this is a
                               1145                 :                :                  * message propagated from a parallel worker.  Otherwise, it
                               1146                 :                :                  * can sometimes be confusing to understand what actually
                               1147                 :                :                  * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
                               1148                 :                :                  * because it causes test-result instability depending on
                               1149                 :                :                  * whether a parallel worker is actually used or not.)
                               1150                 :                :                  */
  424 drowley@postgresql.o     1151         [ +  - ]:              3 :                 if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
                               1152                 :                :                 {
 2788 tgl@sss.pgh.pa.us        1153         [ -  + ]:              3 :                     if (edata.context)
 2788 tgl@sss.pgh.pa.us        1154                 :UBC           0 :                         edata.context = psprintf("%s\n%s", edata.context,
                               1155                 :                :                                                  _("parallel worker"));
                               1156                 :                :                     else
 2788 tgl@sss.pgh.pa.us        1157                 :CBC           3 :                         edata.context = pstrdup(_("parallel worker"));
                               1158                 :                :                 }
                               1159                 :                : 
                               1160                 :                :                 /*
                               1161                 :                :                  * Context beyond that should use the error context callbacks
                               1162                 :                :                  * that were in effect when the ParallelContext was created,
                               1163                 :                :                  * not the current ones.
                               1164                 :                :                  */
                               1165                 :              3 :                 save_error_context_stack = error_context_stack;
                               1166                 :              3 :                 error_context_stack = pcxt->error_context_stack;
                               1167                 :                : 
                               1168                 :                :                 /* Rethrow error or print notice. */
 3272 rhaas@postgresql.org     1169                 :              3 :                 ThrowErrorData(&edata);
                               1170                 :                : 
                               1171                 :                :                 /* Not an error, so restore previous context stack. */
 3272 rhaas@postgresql.org     1172                 :UBC           0 :                 error_context_stack = save_error_context_stack;
                               1173                 :                : 
                               1174                 :              0 :                 break;
                               1175                 :                :             }
                               1176                 :                : 
  236 nathan@postgresql.or     1177                 :UNC           0 :         case PqMsg_NotificationResponse:
                               1178                 :                :             {
                               1179                 :                :                 /* Propagate NotifyResponse. */
                               1180                 :                :                 int32       pid;
                               1181                 :                :                 const char *channel;
                               1182                 :                :                 const char *payload;
                               1183                 :                : 
 2845 rhaas@postgresql.org     1184                 :UBC           0 :                 pid = pq_getmsgint(msg, 4);
                               1185                 :              0 :                 channel = pq_getmsgrawstring(msg);
                               1186                 :              0 :                 payload = pq_getmsgrawstring(msg);
                               1187                 :              0 :                 pq_endmessage(msg);
                               1188                 :                : 
                               1189                 :              0 :                 NotifyMyFrontEnd(channel, payload, pid);
                               1190                 :                : 
 3272                          1191                 :              0 :                 break;
                               1192                 :                :             }
                               1193                 :                : 
  278 msawada@postgresql.o     1194                 :GNC           2 :         case 'P':               /* Parallel progress reporting */
                               1195                 :                :             {
                               1196                 :                :                 /*
                               1197                 :                :                  * Only incremental progress reporting is currently supported.
                               1198                 :                :                  * However, it's possible to add more fields to the message to
                               1199                 :                :                  * allow for handling of other backend progress APIs.
                               1200                 :                :                  */
                               1201                 :              2 :                 int         index = pq_getmsgint(msg, 4);
                               1202                 :              2 :                 int64       incr = pq_getmsgint64(msg);
                               1203                 :                : 
                               1204                 :              2 :                 pq_getmsgend(msg);
                               1205                 :                : 
                               1206                 :              2 :                 pgstat_progress_incr_param(index, incr);
                               1207                 :                : 
                               1208                 :              2 :                 break;
                               1209                 :                :             }
                               1210                 :                : 
  236 nathan@postgresql.or     1211                 :           1319 :         case PqMsg_Terminate:
                               1212                 :                :             {
 2418 tgl@sss.pgh.pa.us        1213                 :CBC        1319 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
 3272 rhaas@postgresql.org     1214                 :           1319 :                 pcxt->worker[i].error_mqh = NULL;
                               1215                 :           1319 :                 break;
                               1216                 :                :             }
                               1217                 :                : 
 3272 rhaas@postgresql.org     1218                 :UBC           0 :         default:
                               1219                 :                :             {
 2813 tgl@sss.pgh.pa.us        1220         [ #  # ]:              0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
                               1221                 :                :                      msgtype, msg->len);
                               1222                 :                :             }
                               1223                 :                :     }
 3272 rhaas@postgresql.org     1224                 :CBC        1321 : }
                               1225                 :                : 
                               1226                 :                : /*
                               1227                 :                :  * End-of-subtransaction cleanup for parallel contexts.
                               1228                 :                :  *
                               1229                 :                :  * Here we remove only parallel contexts initiated within the current
                               1230                 :                :  * subtransaction.
                               1231                 :                :  */
                               1232                 :                : void
                               1233                 :           9927 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
                               1234                 :                : {
                               1235         [ +  + ]:           9930 :     while (!dlist_is_empty(&pcxt_list))
                               1236                 :                :     {
                               1237                 :                :         ParallelContext *pcxt;
                               1238                 :                : 
                               1239                 :              3 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
                               1240         [ -  + ]:              3 :         if (pcxt->subid != mySubId)
 3272 rhaas@postgresql.org     1241                 :UBC           0 :             break;
 3272 rhaas@postgresql.org     1242         [ -  + ]:CBC           3 :         if (isCommit)
 3272 rhaas@postgresql.org     1243         [ #  # ]:UBC           0 :             elog(WARNING, "leaked parallel context");
 3272 rhaas@postgresql.org     1244                 :CBC           3 :         DestroyParallelContext(pcxt);
                               1245                 :                :     }
                               1246                 :           9927 : }
                               1247                 :                : 
                               1248                 :                : /*
                               1249                 :                :  * End-of-transaction cleanup for parallel contexts.
                               1250                 :                :  *
                               1251                 :                :  * We nuke all remaining parallel contexts.
                               1252                 :                :  */
                               1253                 :                : void
                               1254                 :         431197 : AtEOXact_Parallel(bool isCommit)
                               1255                 :                : {
                               1256         [ -  + ]:         431197 :     while (!dlist_is_empty(&pcxt_list))
                               1257                 :                :     {
                               1258                 :                :         ParallelContext *pcxt;
                               1259                 :                : 
 3272 rhaas@postgresql.org     1260                 :UBC           0 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
                               1261         [ #  # ]:              0 :         if (isCommit)
                               1262         [ #  # ]:              0 :             elog(WARNING, "leaked parallel context");
                               1263                 :              0 :         DestroyParallelContext(pcxt);
                               1264                 :                :     }
 3272 rhaas@postgresql.org     1265                 :CBC      431197 : }
                               1266                 :                : 
                               1267                 :                : /*
                               1268                 :                :  * Main entrypoint for parallel workers.
                               1269                 :                :  */
                               1270                 :                : void
                               1271                 :           1322 : ParallelWorkerMain(Datum main_arg)
                               1272                 :                : {
                               1273                 :                :     dsm_segment *seg;
                               1274                 :                :     shm_toc    *toc;
                               1275                 :                :     FixedParallelState *fps;
                               1276                 :                :     char       *error_queue_space;
                               1277                 :                :     shm_mq     *mq;
                               1278                 :                :     shm_mq_handle *mqh;
                               1279                 :                :     char       *libraryspace;
                               1280                 :                :     char       *entrypointstate;
                               1281                 :                :     char       *library_name;
                               1282                 :                :     char       *function_name;
                               1283                 :                :     parallel_worker_main_type entrypt;
                               1284                 :                :     char       *gucspace;
                               1285                 :                :     char       *combocidspace;
                               1286                 :                :     char       *tsnapspace;
                               1287                 :                :     char       *asnapspace;
                               1288                 :                :     char       *tstatespace;
                               1289                 :                :     char       *pendingsyncsspace;
                               1290                 :                :     char       *reindexspace;
                               1291                 :                :     char       *relmapperspace;
                               1292                 :                :     char       *uncommittedenumsspace;
                               1293                 :                :     char       *clientconninfospace;
                               1294                 :                :     char       *session_dsm_handle_space;
                               1295                 :                :     Snapshot    tsnapshot;
                               1296                 :                :     Snapshot    asnapshot;
                               1297                 :                : 
                               1298                 :                :     /* Set flag to indicate that we're initializing a parallel worker. */
 3103                          1299                 :           1322 :     InitializingParallelWorker = true;
                               1300                 :                : 
                               1301                 :                :     /* Establish signal handlers. */
 3272                          1302                 :           1322 :     pqsignal(SIGTERM, die);
                               1303                 :           1322 :     BackgroundWorkerUnblockSignals();
                               1304                 :                : 
                               1305                 :                :     /* Determine and set our parallel worker number. */
 3083                          1306         [ -  + ]:           1322 :     Assert(ParallelWorkerNumber == -1);
                               1307                 :           1322 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
                               1308                 :                : 
                               1309                 :                :     /* Set up a memory context to work in, just for cleanliness. */
 3272                          1310                 :           1322 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
                               1311                 :                :                                                  "Parallel worker",
                               1312                 :                :                                                  ALLOCSET_DEFAULT_SIZES);
                               1313                 :                : 
                               1314                 :                :     /*
                               1315                 :                :      * Attach to the dynamic shared memory segment for the parallel query, and
                               1316                 :                :      * find its table of contents.
                               1317                 :                :      *
                               1318                 :                :      * Note: at this point, we have not created any ResourceOwner in this
                               1319                 :                :      * process.  This will result in our DSM mapping surviving until process
                               1320                 :                :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
                               1321                 :                :      * ownership of the mapping, but we have no need for that.
                               1322                 :                :      */
                               1323                 :           1322 :     seg = dsm_attach(DatumGetUInt32(main_arg));
                               1324         [ -  + ]:           1322 :     if (seg == NULL)
 3272 rhaas@postgresql.org     1325         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1326                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1327                 :                :                  errmsg("could not map dynamic shared memory segment")));
 3272 rhaas@postgresql.org     1328                 :CBC        1322 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
                               1329         [ -  + ]:           1322 :     if (toc == NULL)
 3272 rhaas@postgresql.org     1330         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1331                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1332                 :                :                  errmsg("invalid magic number in dynamic shared memory segment")));
                               1333                 :                : 
                               1334                 :                :     /* Look up fixed parallel state. */
 2505 tgl@sss.pgh.pa.us        1335                 :CBC        1322 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
 3272 rhaas@postgresql.org     1336                 :           1322 :     MyFixedParallelState = fps;
                               1337                 :                : 
                               1338                 :                :     /* Arrange to signal the leader if we exit. */
 1400 andres@anarazel.de       1339                 :           1322 :     ParallelLeaderPid = fps->parallel_leader_pid;
   42 heikki.linnakangas@i     1340                 :GNC        1322 :     ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
 1110 andres@anarazel.de       1341                 :CBC        1322 :     before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
                               1342                 :                : 
                               1343                 :                :     /*
                               1344                 :                :      * Now we can find and attach to the error queue provided for us.  That's
                               1345                 :                :      * good, because until we do that, any errors that happen here will not be
                               1346                 :                :      * reported back to the process that requested that this worker be
                               1347                 :                :      * launched.
                               1348                 :                :      */
 2505 tgl@sss.pgh.pa.us        1349                 :           1322 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
 3272 rhaas@postgresql.org     1350                 :           1322 :     mq = (shm_mq *) (error_queue_space +
 3249 bruce@momjian.us         1351                 :           1322 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
 3272 rhaas@postgresql.org     1352                 :           1322 :     shm_mq_set_sender(mq, MyProc);
                               1353                 :           1322 :     mqh = shm_mq_attach(mq, seg, NULL);
 3103                          1354                 :           1322 :     pq_redirect_to_shm_mq(seg, mqh);
 1400 andres@anarazel.de       1355                 :           1322 :     pq_set_parallel_leader(fps->parallel_leader_pid,
                               1356                 :                :                            fps->parallel_leader_proc_number);
                               1357                 :                : 
                               1358                 :                :     /*
                               1359                 :                :      * Hooray! Primary initialization is complete.  Now, we need to set up our
                               1360                 :                :      * backend-local state to match the original backend.
                               1361                 :                :      */
                               1362                 :                : 
                               1363                 :                :     /*
                               1364                 :                :      * Join locking group.  We must do this before anything that could try to
                               1365                 :                :      * acquire a heavyweight lock, because any heavyweight locks acquired to
                               1366                 :                :      * this point could block either directly against the parallel group
                               1367                 :                :      * leader or against some process which in turn waits for a lock that
                               1368                 :                :      * conflicts with the parallel group leader, causing an undetected
                               1369                 :                :      * deadlock.  (If we can't join the lock group, the leader has gone away,
                               1370                 :                :      * so just exit quietly.)
                               1371                 :                :      */
                               1372         [ -  + ]:           1322 :     if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
                               1373                 :                :                                fps->parallel_leader_pid))
 2989 rhaas@postgresql.org     1374                 :UBC           0 :         return;
                               1375                 :                : 
                               1376                 :                :     /*
                               1377                 :                :      * Restore transaction and statement start-time timestamps.  This must
                               1378                 :                :      * happen before anything that would start a transaction, else asserts in
                               1379                 :                :      * xact.c will fire.
                               1380                 :                :      */
 2017 tgl@sss.pgh.pa.us        1381                 :CBC        1322 :     SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
                               1382                 :                : 
                               1383                 :                :     /*
                               1384                 :                :      * Identify the entry point to be called.  In theory this could result in
                               1385                 :                :      * loading an additional library, though most likely the entry point is in
                               1386                 :                :      * the core backend or in a library we just loaded.
                               1387                 :                :      */
 2505                          1388                 :           1322 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
 2557                          1389                 :           1322 :     library_name = entrypointstate;
                               1390                 :           1322 :     function_name = entrypointstate + strlen(library_name) + 1;
                               1391                 :                : 
                               1392                 :           1322 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
                               1393                 :                : 
                               1394                 :                :     /* Restore database connection. */
 3272 rhaas@postgresql.org     1395                 :           1322 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
                               1396                 :                :                                               fps->authenticated_user_id,
                               1397                 :                :                                               0);
                               1398                 :                : 
                               1399                 :                :     /*
                               1400                 :                :      * Set the client encoding to the database encoding, since that is what
                               1401                 :                :      * the leader will expect.
                               1402                 :                :      */
 2845                          1403                 :           1322 :     SetClientEncoding(GetDatabaseEncoding());
                               1404                 :                : 
                               1405                 :                :     /*
                               1406                 :                :      * Load libraries that were loaded by original backend.  We want to do
                               1407                 :                :      * this before restoring GUCs, because the libraries might define custom
                               1408                 :                :      * variables.
                               1409                 :                :      */
 2033 tmunro@postgresql.or     1410                 :           1322 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
                               1411                 :           1322 :     StartTransactionCommand();
                               1412                 :           1322 :     RestoreLibraryState(libraryspace);
                               1413                 :                : 
                               1414                 :                :     /* Restore GUC values from launching backend. */
 2505 tgl@sss.pgh.pa.us        1415                 :           1322 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
 3272 rhaas@postgresql.org     1416                 :           1322 :     RestoreGUCState(gucspace);
                               1417                 :           1322 :     CommitTransactionCommand();
                               1418                 :                : 
                               1419                 :                :     /* Crank up a transaction state appropriate to a parallel worker. */
 2505 tgl@sss.pgh.pa.us        1420                 :           1322 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
 3272 rhaas@postgresql.org     1421                 :           1322 :     StartParallelWorkerTransaction(tstatespace);
                               1422                 :                : 
                               1423                 :                :     /* Restore combo CID state. */
 2505 tgl@sss.pgh.pa.us        1424                 :           1322 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
 3272 rhaas@postgresql.org     1425                 :           1322 :     RestoreComboCIDState(combocidspace);
                               1426                 :                : 
                               1427                 :                :     /* Attach to the per-session DSM segment and contained objects. */
                               1428                 :                :     session_dsm_handle_space =
 2404 andres@anarazel.de       1429                 :           1322 :         shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
                               1430                 :           1322 :     AttachSession(*(dsm_handle *) session_dsm_handle_space);
                               1431                 :                : 
                               1432                 :                :     /*
                               1433                 :                :      * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
                               1434                 :                :      * the leader has serialized the transaction snapshot and we must restore
                               1435                 :                :      * it. At lower isolation levels, there is no transaction-lifetime
                               1436                 :                :      * snapshot, but we need TransactionXmin to get set to a value which is
                               1437                 :                :      * less than or equal to the xmin of every snapshot that will be used by
                               1438                 :                :      * this worker. The easiest way to accomplish that is to install the
                               1439                 :                :      * active snapshot as the transaction snapshot. Code running in this
                               1440                 :                :      * parallel worker might take new snapshots via GetTransactionSnapshot()
                               1441                 :                :      * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
                               1442                 :                :      * snapshot older than the active snapshot.
                               1443                 :                :      */
 2505 tgl@sss.pgh.pa.us        1444                 :           1322 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
  963 rhaas@postgresql.org     1445                 :           1322 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
                               1446                 :           1322 :     asnapshot = RestoreSnapshot(asnapspace);
                               1447         [ +  + ]:           1322 :     tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
                               1448                 :           1322 :     RestoreTransactionSnapshot(tsnapshot,
                               1449                 :           1322 :                                fps->parallel_leader_pgproc);
                               1450                 :           1322 :     PushActiveSnapshot(asnapshot);
                               1451                 :                : 
                               1452                 :                :     /*
                               1453                 :                :      * We've changed which tuples we can see, and must therefore invalidate
                               1454                 :                :      * system caches.
                               1455                 :                :      */
 3103                          1456                 :           1322 :     InvalidateSystemCaches();
                               1457                 :                : 
                               1458                 :                :     /*
                               1459                 :                :      * Restore current role id.  Skip verifying whether session user is
                               1460                 :                :      * allowed to become this role and blindly restore the leader's state for
                               1461                 :                :      * current role.
                               1462                 :                :      */
 2359                          1463                 :           1322 :     SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
                               1464                 :                : 
                               1465                 :                :     /* Restore user ID and security context. */
 3272                          1466                 :           1322 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
                               1467                 :                : 
                               1468                 :                :     /* Restore temp-namespace state to ensure search path matches leader's. */
 2866 tgl@sss.pgh.pa.us        1469                 :           1322 :     SetTempNamespaceState(fps->temp_namespace_id,
                               1470                 :                :                           fps->temp_toast_namespace_id);
                               1471                 :                : 
                               1472                 :                :     /* Restore pending syncs. */
 1471 noah@leadboat.com        1473                 :           1322 :     pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
                               1474                 :                :                                        false);
                               1475                 :           1322 :     RestorePendingSyncs(pendingsyncsspace);
                               1476                 :                : 
                               1477                 :                :     /* Restore reindex state. */
 2277 rhaas@postgresql.org     1478                 :           1322 :     reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
                               1479                 :           1322 :     RestoreReindexState(reindexspace);
                               1480                 :                : 
                               1481                 :                :     /* Restore relmapper state. */
 2074 pg@bowt.ie               1482                 :           1322 :     relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
                               1483                 :           1322 :     RestoreRelationMap(relmapperspace);
                               1484                 :                : 
                               1485                 :                :     /* Restore uncommitted enums. */
 1195 tmunro@postgresql.or     1486                 :           1322 :     uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
                               1487                 :                :                                            false);
                               1488                 :           1322 :     RestoreUncommittedEnums(uncommittedenumsspace);
                               1489                 :                : 
                               1490                 :                :     /* Restore the ClientConnectionInfo. */
  599 michael@paquier.xyz      1491                 :           1322 :     clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
                               1492                 :                :                                          false);
                               1493                 :           1322 :     RestoreClientConnectionInfo(clientconninfospace);
                               1494                 :                : 
                               1495                 :                :     /*
                               1496                 :                :      * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
                               1497                 :                :      * ensure that auth_method is actually valid, aka authn_id is not NULL.
                               1498                 :                :      */
  563                          1499         [ +  + ]:           1322 :     if (MyClientConnectionInfo.authn_id)
                               1500                 :              4 :         InitializeSystemUser(MyClientConnectionInfo.authn_id,
                               1501                 :                :                              hba_authname(MyClientConnectionInfo.auth_method));
                               1502                 :                : 
                               1503                 :                :     /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
 1857 tmunro@postgresql.or     1504                 :           1322 :     AttachSerializableXact(fps->serializable_xact_handle);
                               1505                 :                : 
                               1506                 :                :     /*
                               1507                 :                :      * We've initialized all of our state now; nothing should change
                               1508                 :                :      * hereafter.
                               1509                 :                :      */
 3103 rhaas@postgresql.org     1510                 :           1322 :     InitializingParallelWorker = false;
 3272                          1511                 :           1322 :     EnterParallelMode();
                               1512                 :                : 
                               1513                 :                :     /*
                               1514                 :                :      * Time to do the real work: invoke the caller-supplied code.
                               1515                 :                :      */
 2557 tgl@sss.pgh.pa.us        1516                 :           1322 :     entrypt(seg, toc);
                               1517                 :                : 
                               1518                 :                :     /* Must exit parallel mode to pop active snapshot. */
 3272 rhaas@postgresql.org     1519                 :           1319 :     ExitParallelMode();
                               1520                 :                : 
                               1521                 :                :     /* Must pop active snapshot so snapmgr.c doesn't complain. */
                               1522                 :           1319 :     PopActiveSnapshot();
                               1523                 :                : 
                               1524                 :                :     /* Shut down the parallel-worker transaction. */
                               1525                 :           1319 :     EndParallelWorkerTransaction();
                               1526                 :                : 
                               1527                 :                :     /* Detach from the per-session DSM segment. */
 2404 andres@anarazel.de       1528                 :           1319 :     DetachSession();
                               1529                 :                : 
                               1530                 :                :     /* Report success. */
  236 nathan@postgresql.or     1531                 :GNC        1319 :     pq_putmessage(PqMsg_Terminate, NULL, 0);
                               1532                 :                : }
                               1533                 :                : 
                               1534                 :                : /*
                               1535                 :                :  * Update shared memory with the ending location of the last WAL record we
                               1536                 :                :  * wrote, if it's greater than the value already stored there.
                               1537                 :                :  */
                               1538                 :                : void
 3272 rhaas@postgresql.org     1539                 :CBC        1319 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
                               1540                 :                : {
                               1541                 :           1319 :     FixedParallelState *fps = MyFixedParallelState;
                               1542                 :                : 
                               1543         [ -  + ]:           1319 :     Assert(fps != NULL);
                               1544         [ +  + ]:           1319 :     SpinLockAcquire(&fps->mutex);
                               1545         [ +  + ]:           1319 :     if (fps->last_xlog_end < last_xlog_end)
                               1546                 :             64 :         fps->last_xlog_end = last_xlog_end;
                               1547                 :           1319 :     SpinLockRelease(&fps->mutex);
                               1548                 :           1319 : }
                               1549                 :                : 
                               1550                 :                : /*
                               1551                 :                :  * Make sure the leader tries to read from our error queue one more time.
                               1552                 :                :  * This guards against the case where we exit uncleanly without sending an
                               1553                 :                :  * ErrorResponse to the leader, for example because some code calls proc_exit
                               1554                 :                :  * directly.
                               1555                 :                :  *
                               1556                 :                :  * Also explicitly detach from dsm segment so that subsystems using
                               1557                 :                :  * on_dsm_detach() have a chance to send stats before the stats subsystem is
                               1558                 :                :  * shut down as part of a before_shmem_exit() hook.
                               1559                 :                :  *
                               1560                 :                :  * One might think this could instead be solved by carefully ordering the
                               1561                 :                :  * attaching to dsm segments, so that the pgstats segments get detached from
                               1562                 :                :  * later than the parallel query one. That turns out to not work because the
                               1563                 :                :  * stats hash might need to grow which can cause new segments to be allocated,
                               1564                 :                :  * which then will be detached from earlier.
                               1565                 :                :  */
                               1566                 :                : static void
 2273                          1567                 :           1322 : ParallelWorkerShutdown(int code, Datum arg)
                               1568                 :                : {
 1400 andres@anarazel.de       1569                 :           1322 :     SendProcSignal(ParallelLeaderPid,
                               1570                 :                :                    PROCSIG_PARALLEL_MESSAGE,
                               1571                 :                :                    ParallelLeaderProcNumber);
                               1572                 :                : 
 1110                          1573                 :           1322 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
 2273 rhaas@postgresql.org     1574                 :           1322 : }
                               1575                 :                : 
                               1576                 :                : /*
                               1577                 :                :  * Look up (and possibly load) a parallel worker entry point function.
                               1578                 :                :  *
                               1579                 :                :  * For functions contained in the core code, we use library name "postgres"
                               1580                 :                :  * and consult the InternalParallelWorkers array.  External functions are
                               1581                 :                :  * looked up, and loaded if necessary, using load_external_function().
                               1582                 :                :  *
                               1583                 :                :  * The point of this is to pass function names as strings across process
                               1584                 :                :  * boundaries.  We can't pass actual function addresses because of the
                               1585                 :                :  * possibility that the function has been loaded at a different address
                               1586                 :                :  * in a different process.  This is obviously a hazard for functions in
                               1587                 :                :  * loadable libraries, but it can happen even for functions in the core code
                               1588                 :                :  * on platforms using EXEC_BACKEND (e.g., Windows).
                               1589                 :                :  *
                               1590                 :                :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
                               1591                 :                :  * in favor of applying load_external_function() for core functions too;
                               1592                 :                :  * but that raises portability issues that are not worth addressing now.
                               1593                 :                :  */
                               1594                 :                : static parallel_worker_main_type
 2557 tgl@sss.pgh.pa.us        1595                 :           1322 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
                               1596                 :                : {
                               1597                 :                :     /*
                               1598                 :                :      * If the function is to be loaded from postgres itself, search the
                               1599                 :                :      * InternalParallelWorkers array.
                               1600                 :                :      */
                               1601         [ +  - ]:           1322 :     if (strcmp(libraryname, "postgres") == 0)
                               1602                 :                :     {
                               1603                 :                :         int         i;
                               1604                 :                : 
                               1605         [ +  - ]:           1447 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
                               1606                 :                :         {
                               1607         [ +  + ]:           1447 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
                               1608                 :           1322 :                 return InternalParallelWorkers[i].fn_addr;
                               1609                 :                :         }
                               1610                 :                : 
                               1611                 :                :         /* We can only reach this by programming error. */
 2557 tgl@sss.pgh.pa.us        1612         [ #  # ]:UBC           0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
                               1613                 :                :     }
                               1614                 :                : 
                               1615                 :                :     /* Otherwise load from external library. */
                               1616                 :              0 :     return (parallel_worker_main_type)
                               1617                 :              0 :         load_external_function(libraryname, funcname, true, NULL);
                               1618                 :                : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622