Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : * Infrastructure for launching parallel workers
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/parallel.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/nbtree.h"
18 : #include "access/parallel.h"
19 : #include "access/session.h"
20 : #include "access/xact.h"
21 : #include "access/xlog.h"
22 : #include "catalog/index.h"
23 : #include "catalog/namespace.h"
24 : #include "catalog/pg_enum.h"
25 : #include "catalog/storage.h"
26 : #include "commands/async.h"
27 : #include "commands/vacuum.h"
28 : #include "executor/execParallel.h"
29 : #include "libpq/libpq.h"
30 : #include "libpq/pqformat.h"
31 : #include "libpq/pqmq.h"
32 : #include "miscadmin.h"
33 : #include "optimizer/optimizer.h"
34 : #include "pgstat.h"
35 : #include "storage/ipc.h"
36 : #include "storage/predicate.h"
37 : #include "storage/sinval.h"
38 : #include "storage/spin.h"
39 : #include "tcop/tcopprot.h"
40 : #include "utils/combocid.h"
41 : #include "utils/guc.h"
42 : #include "utils/inval.h"
43 : #include "utils/memutils.h"
44 : #include "utils/relmapper.h"
45 : #include "utils/snapmgr.h"
46 : #include "utils/typcache.h"
47 :
48 : /*
49 : * We don't want to waste a lot of memory on an error queue which, most of
50 : * the time, will process only a handful of small messages. However, it is
51 : * desirable to make it large enough that a typical ErrorResponse can be sent
52 : * without blocking. That way, a worker that errors out can write the whole
53 : * message into the queue and terminate without waiting for the user backend.
54 : */
55 : #define PARALLEL_ERROR_QUEUE_SIZE 16384
56 :
57 : /* Magic number for parallel context TOC. */
58 : #define PARALLEL_MAGIC 0x50477c7c
59 :
60 : /*
61 : * Magic numbers for per-context parallel state sharing. Higher-level code
62 : * should use smaller values, leaving these very large ones for use by this
63 : * module.
64 : */
65 : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
66 : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
67 : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
68 : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
69 : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
70 : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
71 : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
72 : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
73 : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
74 : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
75 : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
76 : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
77 : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
78 : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
79 : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
80 :
81 : /* Fixed-size parallel state. */
82 : typedef struct FixedParallelState
83 : {
84 : /* Fixed-size state that workers must restore. */
85 : Oid database_id;
86 : Oid authenticated_user_id;
87 : Oid current_user_id;
88 : Oid outer_user_id;
89 : Oid temp_namespace_id;
90 : Oid temp_toast_namespace_id;
91 : int sec_context;
92 : bool is_superuser;
93 : PGPROC *parallel_leader_pgproc;
94 : pid_t parallel_leader_pid;
95 : BackendId parallel_leader_backend_id;
96 : TimestampTz xact_ts;
97 : TimestampTz stmt_ts;
98 : SerializableXactHandle serializable_xact_handle;
99 :
100 : /* Mutex protects remaining fields. */
101 : slock_t mutex;
102 :
103 : /* Maximum XactLastRecEnd of any worker. */
104 : XLogRecPtr last_xlog_end;
105 : } FixedParallelState;
106 :
107 : /*
108 : * Our parallel worker number. We initialize this to -1, meaning that we are
109 : * not a parallel worker. In parallel workers, it will be set to a value >= 0
110 : * and < the number of workers before any user code is invoked; each parallel
111 : * worker will get a different parallel worker number.
112 : */
113 : int ParallelWorkerNumber = -1;
114 :
115 : /* Is there a parallel message pending which we need to receive? */
116 : volatile sig_atomic_t ParallelMessagePending = false;
117 :
118 : /* Are we initializing a parallel worker? */
119 : bool InitializingParallelWorker = false;
120 :
121 : /* Pointer to our fixed parallel state. */
122 : static FixedParallelState *MyFixedParallelState;
123 :
124 : /* List of active parallel contexts. */
125 : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
126 :
127 : /* Backend-local copy of data from FixedParallelState. */
128 : static pid_t ParallelLeaderPid;
129 :
130 : /*
131 : * List of internal parallel worker entry points. We need this for
132 : * reasons explained in LookupParallelWorkerFunction(), below.
133 : */
134 : static const struct
135 : {
136 : const char *fn_name;
137 : parallel_worker_main_type fn_addr;
138 : } InternalParallelWorkers[] =
139 :
140 : {
141 : {
142 : "ParallelQueryMain", ParallelQueryMain
143 : },
144 : {
145 : "_bt_parallel_build_main", _bt_parallel_build_main
146 : },
147 : {
148 : "parallel_vacuum_main", parallel_vacuum_main
149 : }
150 : };
151 :
152 : /* Private functions. */
153 : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
154 : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
155 : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
156 : static void ParallelWorkerShutdown(int code, Datum arg);
157 :
158 :
159 : /*
160 : * Establish a new parallel context. This should be done after entering
161 : * parallel mode, and (unless there is an error) the context should be
162 : * destroyed before exiting the current subtransaction.
163 : */
164 : ParallelContext *
2186 tgl 165 GIC 403 : CreateParallelContext(const char *library_name, const char *function_name,
1486 tmunro 166 ECB : int nworkers)
167 : {
168 : MemoryContext oldcontext;
169 : ParallelContext *pcxt;
170 :
171 : /* It is unsafe to create a parallel context if not in parallel mode. */
2901 rhaas 172 GIC 403 : Assert(IsInParallelMode());
2901 rhaas 173 ECB :
174 : /* Number of workers should be non-negative. */
2901 rhaas 175 GIC 403 : Assert(nworkers >= 0);
2901 rhaas 176 ECB :
177 : /* We might be running in a short-lived memory context. */
2901 rhaas 178 GIC 403 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
2901 rhaas 179 ECB :
180 : /* Initialize a new ParallelContext. */
2901 rhaas 181 GIC 403 : pcxt = palloc0(sizeof(ParallelContext));
2901 rhaas 182 CBC 403 : pcxt->subid = GetCurrentSubTransactionId();
183 403 : pcxt->nworkers = nworkers;
1175 akapila 184 403 : pcxt->nworkers_to_launch = nworkers;
2186 tgl 185 403 : pcxt->library_name = pstrdup(library_name);
186 403 : pcxt->function_name = pstrdup(function_name);
2901 rhaas 187 403 : pcxt->error_context_stack = error_context_stack;
188 403 : shm_toc_initialize_estimator(&pcxt->estimator);
189 403 : dlist_push_head(&pcxt_list, &pcxt->node);
2901 rhaas 190 ECB :
191 : /* Restore previous memory context. */
2901 rhaas 192 GIC 403 : MemoryContextSwitchTo(oldcontext);
2901 rhaas 193 ECB :
2901 rhaas 194 GIC 403 : return pcxt;
2901 rhaas 195 ECB : }
196 :
197 : /*
198 : * Establish the dynamic shared memory segment for a parallel context and
199 : * copy state and other bookkeeping information that will be needed by
200 : * parallel workers into it.
201 : */
202 : void
2901 rhaas 203 GIC 403 : InitializeParallelDSM(ParallelContext *pcxt)
2901 rhaas 204 ECB : {
205 : MemoryContext oldcontext;
2878 bruce 206 GIC 403 : Size library_len = 0;
2878 bruce 207 CBC 403 : Size guc_len = 0;
208 403 : Size combocidlen = 0;
209 403 : Size tsnaplen = 0;
210 403 : Size asnaplen = 0;
211 403 : Size tstatelen = 0;
1100 noah 212 403 : Size pendingsyncslen = 0;
1906 rhaas 213 403 : Size reindexlen = 0;
1703 pg 214 403 : Size relmapperlen = 0;
824 tmunro 215 403 : Size uncommittedenumslen = 0;
228 michael 216 GNC 403 : Size clientconninfolen = 0;
2878 bruce 217 CBC 403 : Size segsize = 0;
2878 bruce 218 ECB : int i;
2901 rhaas 219 : FixedParallelState *fps;
2033 andres 220 GIC 403 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
2901 rhaas 221 403 : Snapshot transaction_snapshot = GetTransactionSnapshot();
2901 rhaas 222 CBC 403 : Snapshot active_snapshot = GetActiveSnapshot();
2901 rhaas 223 ECB :
224 : /* We might be running in a very short-lived memory context. */
2901 rhaas 225 GIC 403 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
226 :
2901 rhaas 227 ECB : /* Allow space to store the fixed-size parallel state. */
2901 rhaas 228 GIC 403 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
229 403 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2901 rhaas 230 ECB :
231 : /*
232 : * Normally, the user will have requested at least one worker process, but
233 : * if by chance they have not, we can skip a bunch of things here.
234 : */
2033 andres 235 GIC 403 : if (pcxt->nworkers > 0)
236 : {
2033 andres 237 ECB : /* Get (or create) the per-session DSM segment's handle. */
2033 andres 238 GIC 403 : session_dsm_handle = GetSessionDsmHandle();
239 :
2033 andres 240 ECB : /*
241 : * If we weren't able to create a per-session DSM segment, then we can
242 : * continue but we can't safely launch any workers because their
243 : * record typmods would be incompatible so they couldn't exchange
244 : * tuples.
245 : */
2033 andres 246 GIC 403 : if (session_dsm_handle == DSM_HANDLE_INVALID)
2033 andres 247 UIC 0 : pcxt->nworkers = 0;
2033 andres 248 ECB : }
2033 andres 249 EUB :
2901 rhaas 250 GIC 403 : if (pcxt->nworkers > 0)
251 : {
2901 rhaas 252 ECB : /* Estimate space for various kinds of state sharing. */
2901 rhaas 253 GIC 403 : library_len = EstimateLibraryStateSpace();
254 403 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
2901 rhaas 255 CBC 403 : guc_len = EstimateGUCStateSpace();
256 403 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
257 403 : combocidlen = EstimateComboCIDStateSpace();
258 403 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
592 259 403 : if (IsolationUsesXactSnapshot())
592 rhaas 260 ECB : {
592 rhaas 261 CBC 11 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
592 rhaas 262 GIC 11 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
592 rhaas 263 ECB : }
2901 rhaas 264 CBC 403 : asnaplen = EstimateSnapshotSpace(active_snapshot);
2901 rhaas 265 GIC 403 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
2901 rhaas 266 CBC 403 : tstatelen = EstimateTransactionStateSpace();
267 403 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
2033 andres 268 403 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
1100 noah 269 403 : pendingsyncslen = EstimatePendingSyncsSpace();
270 403 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
1906 rhaas 271 403 : reindexlen = EstimateReindexStateSpace();
272 403 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
1703 pg 273 403 : relmapperlen = EstimateRelationMapSpace();
274 403 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
824 tmunro 275 403 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
276 403 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
228 michael 277 GNC 403 : clientconninfolen = EstimateClientConnectionInfoSpace();
278 403 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
2901 rhaas 279 ECB : /* If you add more chunks here, you probably need to add keys. */
228 michael 280 GNC 403 : shm_toc_estimate_keys(&pcxt->estimator, 12);
2901 rhaas 281 ECB :
282 : /* Estimate space need for error queues. */
283 : StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
2878 bruce 284 : PARALLEL_ERROR_QUEUE_SIZE,
285 : "parallel error queue size not buffer-aligned");
2901 rhaas 286 GIC 403 : shm_toc_estimate_chunk(&pcxt->estimator,
287 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
288 : pcxt->nworkers));
289 403 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2901 rhaas 290 ECB :
291 : /* Estimate how much we'll need for the entrypoint info. */
2186 tgl 292 GIC 403 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
2186 tgl 293 ECB : strlen(pcxt->function_name) + 2);
2186 tgl 294 GIC 403 : shm_toc_estimate_keys(&pcxt->estimator, 1);
295 : }
2901 rhaas 296 ECB :
297 : /*
298 : * Create DSM and initialize with new table of contents. But if the user
299 : * didn't request any workers, then don't bother creating a dynamic shared
300 : * memory segment; instead, just use backend-private memory.
301 : *
302 : * Also, if we can't create a dynamic shared memory segment because the
303 : * maximum number of segments have already been created, then fall back to
304 : * backend-private memory, and plan not to use any workers. We hope this
305 : * won't happen very often, but it's better to abandon the use of
306 : * parallelism than to fail outright.
307 : */
2901 rhaas 308 GIC 403 : segsize = shm_toc_estimate(&pcxt->estimator);
2488 tgl 309 403 : if (pcxt->nworkers > 0)
2901 rhaas 310 403 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
311 403 : if (pcxt->seg != NULL)
2901 rhaas 312 CBC 403 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
2901 rhaas 313 ECB : dsm_segment_address(pcxt->seg),
314 : segsize);
315 : else
316 : {
2901 rhaas 317 UIC 0 : pcxt->nworkers = 0;
2896 318 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
319 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
320 : segsize);
2901 rhaas 321 EUB : }
322 :
323 : /* Initialize fixed-size state in shared memory. */
324 : fps = (FixedParallelState *)
2901 rhaas 325 GIC 403 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
326 403 : fps->database_id = MyDatabaseId;
327 403 : fps->authenticated_user_id = GetAuthenticatedUserId();
1988 328 403 : fps->outer_user_id = GetCurrentRoleId();
1988 rhaas 329 CBC 403 : fps->is_superuser = session_auth_is_superuser;
2901 330 403 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
2495 tgl 331 403 : GetTempNamespaceState(&fps->temp_namespace_id,
2495 tgl 332 ECB : &fps->temp_toast_namespace_id);
1029 andres 333 CBC 403 : fps->parallel_leader_pgproc = MyProc;
334 403 : fps->parallel_leader_pid = MyProcPid;
335 403 : fps->parallel_leader_backend_id = MyBackendId;
1646 tgl 336 GIC 403 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
1646 tgl 337 CBC 403 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
1486 tmunro 338 403 : fps->serializable_xact_handle = ShareSerializableXact();
2901 rhaas 339 403 : SpinLockInit(&fps->mutex);
340 403 : fps->last_xlog_end = 0;
341 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
2901 rhaas 342 ECB :
343 : /* We can skip the rest of this if we're not budgeting for any workers. */
2901 rhaas 344 CBC 403 : if (pcxt->nworkers > 0)
2901 rhaas 345 ECB : {
346 : char *libraryspace;
347 : char *gucspace;
2878 bruce 348 : char *combocidspace;
349 : char *tsnapspace;
350 : char *asnapspace;
351 : char *tstatespace;
352 : char *pendingsyncsspace;
353 : char *reindexspace;
354 : char *relmapperspace;
355 : char *error_queue_space;
356 : char *session_dsm_handle_space;
357 : char *entrypointstate;
358 : char *uncommittedenumsspace;
359 : char *clientconninfospace;
360 : Size lnamelen;
361 :
362 : /* Serialize shared libraries we have loaded. */
2901 rhaas 363 GIC 403 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
364 403 : SerializeLibraryState(library_len, libraryspace);
365 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
366 :
367 : /* Serialize GUC settings. */
2901 rhaas 368 CBC 403 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
369 403 : SerializeGUCState(guc_len, gucspace);
370 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
371 :
372 : /* Serialize combo CID state. */
373 403 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
374 403 : SerializeComboCIDState(combocidlen, combocidspace);
375 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
376 :
377 : /*
592 rhaas 378 ECB : * Serialize the transaction snapshot if the transaction
379 : * isolation-level uses a transaction snapshot.
380 : */
592 rhaas 381 GIC 403 : if (IsolationUsesXactSnapshot())
382 : {
383 11 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
384 11 : SerializeSnapshot(transaction_snapshot, tsnapspace);
385 11 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
592 rhaas 386 ECB : tsnapspace);
387 : }
388 :
389 : /* Serialize the active snapshot. */
2901 rhaas 390 CBC 403 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
2901 rhaas 391 GIC 403 : SerializeSnapshot(active_snapshot, asnapspace);
392 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
393 :
394 : /* Provide the handle for per-session segment. */
2033 andres 395 CBC 403 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
2033 andres 396 ECB : sizeof(dsm_handle));
2033 andres 397 CBC 403 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
2033 andres 398 GIC 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
399 : session_dsm_handle_space);
2033 andres 400 ECB :
401 : /* Serialize transaction state. */
2901 rhaas 402 CBC 403 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
403 403 : SerializeTransactionState(tstatelen, tstatespace);
2901 rhaas 404 GIC 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
405 :
406 : /* Serialize pending syncs. */
1100 noah 407 CBC 403 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
408 403 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
409 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
410 : pendingsyncsspace);
411 :
1906 rhaas 412 ECB : /* Serialize reindex state. */
1906 rhaas 413 CBC 403 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
414 403 : SerializeReindexState(reindexlen, reindexspace);
1906 rhaas 415 GIC 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
416 :
417 : /* Serialize relmapper state. */
1703 pg 418 CBC 403 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
419 403 : SerializeRelationMap(relmapperlen, relmapperspace);
420 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
421 : relmapperspace);
422 :
824 tmunro 423 ECB : /* Serialize uncommitted enum state. */
824 tmunro 424 CBC 403 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
824 tmunro 425 ECB : uncommittedenumslen);
824 tmunro 426 GIC 403 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
427 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
428 : uncommittedenumsspace);
1643 tmunro 429 ECB :
430 : /* Serialize our ClientConnectionInfo. */
228 michael 431 GNC 403 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
432 403 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
433 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
434 : clientconninfospace);
435 :
436 : /* Allocate space for worker information. */
2901 rhaas 437 CBC 403 : pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
2901 rhaas 438 ECB :
439 : /*
440 : * Establish error queues in dynamic shared memory.
441 : *
442 : * These queues should be used only for transmitting ErrorResponse,
443 : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
444 : * should be transmitted via separate (possibly larger?) queues.
445 : */
446 : error_queue_space =
2878 bruce 447 GIC 403 : shm_toc_allocate(pcxt->toc,
2529 rhaas 448 ECB : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
2529 rhaas 449 GIC 403 : pcxt->nworkers));
2901 450 1326 : for (i = 0; i < pcxt->nworkers; ++i)
451 : {
452 : char *start;
453 : shm_mq *mq;
454 :
455 923 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
456 923 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
457 923 : shm_mq_set_receiver(mq, MyProc);
2901 rhaas 458 CBC 923 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
459 : }
460 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
2901 rhaas 461 ECB :
462 : /*
463 : * Serialize entrypoint information. It's unsafe to pass function
464 : * pointers across processes, as the function pointer may be different
465 : * in each process in EXEC_BACKEND builds, so we always pass library
2186 tgl 466 : * and function name. (We use library name "postgres" for functions
467 : * in the core backend.)
468 : */
2186 tgl 469 CBC 403 : lnamelen = strlen(pcxt->library_name);
2186 tgl 470 GIC 403 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
2186 tgl 471 CBC 403 : strlen(pcxt->function_name) + 2);
2186 tgl 472 GIC 403 : strcpy(entrypointstate, pcxt->library_name);
473 403 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
474 403 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
475 : }
476 :
477 : /* Restore previous memory context. */
2901 rhaas 478 403 : MemoryContextSwitchTo(oldcontext);
479 403 : }
2901 rhaas 480 ECB :
2718 481 : /*
482 : * Reinitialize the dynamic shared memory segment for a parallel context such
483 : * that we could launch workers for it again.
484 : */
485 : void
2718 rhaas 486 GIC 131 : ReinitializeParallelDSM(ParallelContext *pcxt)
487 : {
488 : FixedParallelState *fps;
2718 rhaas 489 ECB :
2488 tgl 490 : /* Wait for any old workers to exit. */
2488 tgl 491 GIC 131 : if (pcxt->nworkers_launched > 0)
492 : {
493 131 : WaitForParallelWorkersToFinish(pcxt);
494 131 : WaitForParallelWorkersToExit(pcxt);
495 131 : pcxt->nworkers_launched = 0;
1892 rhaas 496 131 : if (pcxt->known_attached_workers)
1902 rhaas 497 ECB : {
1892 rhaas 498 GIC 131 : pfree(pcxt->known_attached_workers);
499 131 : pcxt->known_attached_workers = NULL;
500 131 : pcxt->nknown_attached_workers = 0;
501 : }
2488 tgl 502 ECB : }
503 :
2718 rhaas 504 : /* Reset a few bits of fixed parallel state to a clean state. */
2134 tgl 505 CBC 131 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
2718 rhaas 506 131 : fps->last_xlog_end = 0;
2718 rhaas 507 ECB :
508 : /* Recreate error queues (if they exist). */
1892 tgl 509 CBC 131 : if (pcxt->nworkers > 0)
2718 rhaas 510 ECB : {
1892 tgl 511 : char *error_queue_space;
512 : int i;
513 :
514 : error_queue_space =
1892 tgl 515 GIC 131 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
1892 tgl 516 CBC 546 : for (i = 0; i < pcxt->nworkers; ++i)
1892 tgl 517 ECB : {
518 : char *start;
519 : shm_mq *mq;
2718 rhaas 520 :
1892 tgl 521 GIC 415 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
522 415 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
523 415 : shm_mq_set_receiver(mq, MyProc);
524 415 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
525 : }
2718 rhaas 526 ECB : }
2718 rhaas 527 CBC 131 : }
528 :
529 : /*
530 : * Reinitialize parallel workers for a parallel context such that we could
531 : * launch a different number of workers. This is required for cases where
1175 akapila 532 ECB : * we need to reuse the same DSM segment, but the number of workers can
533 : * vary from run-to-run.
534 : */
535 : void
1175 akapila 536 GIC 11 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
537 : {
1175 akapila 538 ECB : /*
539 : * The number of workers that need to be launched must be less than the
540 : * number of workers with which the parallel context is initialized.
541 : */
1175 akapila 542 GIC 11 : Assert(pcxt->nworkers >= nworkers_to_launch);
543 11 : pcxt->nworkers_to_launch = nworkers_to_launch;
544 11 : }
545 :
546 : /*
2901 rhaas 547 ECB : * Launch parallel workers.
548 : */
549 : void
2901 rhaas 550 GIC 534 : LaunchParallelWorkers(ParallelContext *pcxt)
551 : {
552 : MemoryContext oldcontext;
2878 bruce 553 ECB : BackgroundWorker worker;
554 : int i;
2878 bruce 555 CBC 534 : bool any_registrations_failed = false;
556 :
557 : /* Skip this if we have no workers. */
1175 akapila 558 GIC 534 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
2901 rhaas 559 UIC 0 : return;
560 :
2618 rhaas 561 ECB : /* We need to be a lock group leader. */
2618 rhaas 562 GIC 534 : BecomeLockGroupLeader();
563 :
564 : /* If we do have workers, we'd better have a DSM segment. */
2901 565 534 : Assert(pcxt->seg != NULL);
2901 rhaas 566 ECB :
567 : /* We might be running in a short-lived memory context. */
2901 rhaas 568 GIC 534 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
2901 rhaas 569 ECB :
2901 rhaas 570 EUB : /* Configure a worker. */
2184 tgl 571 GIC 534 : memset(&worker, 0, sizeof(worker));
2901 rhaas 572 534 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
2901 rhaas 573 ECB : MyProcPid);
2047 peter_e 574 GIC 534 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
2901 rhaas 575 534 : worker.bgw_flags =
2319 rhaas 576 ECB : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
577 : | BGWORKER_CLASS_PARALLEL;
2901 rhaas 578 GIC 534 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
2901 rhaas 579 CBC 534 : worker.bgw_restart_time = BGW_NEVER_RESTART;
2200 rhaas 580 GIC 534 : sprintf(worker.bgw_library_name, "postgres");
581 534 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
2901 rhaas 582 CBC 534 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
583 534 : worker.bgw_notify_pid = MyProcPid;
584 :
2901 rhaas 585 ECB : /*
586 : * Start workers.
587 : *
588 : * The caller must be able to tolerate ending up with fewer workers than
589 : * expected, so there is no need to throw an error here if registration
2878 bruce 590 : * fails. It wouldn't help much anyway, because registering the worker in
591 : * no way guarantees that it will start up and initialize successfully.
2901 rhaas 592 : */
1175 akapila 593 CBC 1870 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
2901 rhaas 594 ECB : {
2712 rhaas 595 GIC 1336 : memcpy(worker.bgw_extra, &i, sizeof(int));
2901 596 2645 : if (!any_registrations_failed &&
597 1309 : RegisterDynamicBackgroundWorker(&worker,
598 1309 : &pcxt->worker[i].bgwhandle))
599 : {
600 1298 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
601 1298 : pcxt->worker[i].bgwhandle);
2732 602 1298 : pcxt->nworkers_launched++;
603 : }
2901 rhaas 604 ECB : else
605 : {
606 : /*
2878 bruce 607 : * If we weren't able to register the worker, then we've bumped up
608 : * against the max_worker_processes limit, and future
2901 rhaas 609 : * registrations will probably fail too, so arrange to skip them.
610 : * But we still have to execute this code for the remaining slots
611 : * to make sure that we forget about the error queues we budgeted
612 : * for those workers. Otherwise, we'll wait for them to start,
613 : * but they never will.
614 : */
2901 rhaas 615 GIC 38 : any_registrations_failed = true;
616 38 : pcxt->worker[i].bgwhandle = NULL;
2047 tgl 617 38 : shm_mq_detach(pcxt->worker[i].error_mqh);
2901 rhaas 618 38 : pcxt->worker[i].error_mqh = NULL;
619 : }
620 : }
621 :
622 : /*
623 : * Now that nworkers_launched has taken its final value, we can initialize
624 : * known_attached_workers.
625 : */
1902 rhaas 626 CBC 534 : if (pcxt->nworkers_launched > 0)
1892 rhaas 627 ECB : {
1892 rhaas 628 CBC 525 : pcxt->known_attached_workers =
1902 629 525 : palloc0(sizeof(bool) * pcxt->nworkers_launched);
1892 rhaas 630 GIC 525 : pcxt->nknown_attached_workers = 0;
631 : }
632 :
633 : /* Restore previous memory context. */
2901 634 534 : MemoryContextSwitchTo(oldcontext);
635 : }
636 :
1892 rhaas 637 ECB : /*
638 : * Wait for all workers to attach to their error queues, and throw an error if
639 : * any worker fails to do this.
640 : *
641 : * Callers can assume that if this function returns successfully, then the
642 : * number of workers given by pcxt->nworkers_launched have initialized and
643 : * attached to their error queues. Whether or not these workers are guaranteed
644 : * to still be running depends on what code the caller asked them to run;
645 : * this function does not guarantee that they have not exited. However, it
646 : * does guarantee that any workers which exited must have done so cleanly and
647 : * after successfully performing the work with which they were tasked.
648 : *
649 : * If this function is not called, then some of the workers that were launched
650 : * may not have been started due to a fork() failure, or may have exited during
651 : * early startup prior to attaching to the error queue, so nworkers_launched
652 : * cannot be viewed as completely reliable. It will never be less than the
653 : * number of workers which actually started, but it might be more. Any workers
654 : * that failed to start will still be discovered by
655 : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
656 : * provided that function is eventually reached.
657 : *
658 : * In general, the leader process should do as much work as possible before
659 : * calling this function. fork() failures and other early-startup failures
660 : * are very uncommon, and having the leader sit idle when it could be doing
661 : * useful work is undesirable. However, if the leader needs to wait for
662 : * all of its workers or for a specific worker, it may want to call this
663 : * function before doing so. If not, it must make some other provision for
664 : * the failure-to-start case, lest it wait forever. On the other hand, a
665 : * leader which never waits for a worker that might not be started yet, or
666 : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
667 : * call this function at all.
668 : */
669 : void
1892 rhaas 670 GIC 71 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
671 : {
672 : int i;
673 :
674 : /* Skip this if we have no launched workers. */
675 71 : if (pcxt->nworkers_launched == 0)
1892 rhaas 676 UIC 0 : return;
677 :
678 : for (;;)
679 : {
680 : /*
1892 rhaas 681 ECB : * This will process any parallel messages that are pending and it may
682 : * also throw an error propagated from a worker.
683 : */
1892 rhaas 684 GIC 4193812 : CHECK_FOR_INTERRUPTS();
685 :
1892 rhaas 686 CBC 8387624 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1892 rhaas 687 EUB : {
688 : BgwHandleStatus status;
689 : shm_mq *mq;
690 : int rc;
691 : pid_t pid;
692 :
1892 rhaas 693 GIC 4193812 : if (pcxt->known_attached_workers[i])
694 14 : continue;
1892 rhaas 695 ECB :
696 : /*
697 : * If error_mqh is NULL, then the worker has already exited
698 : * cleanly.
699 : */
1892 rhaas 700 GIC 4193798 : if (pcxt->worker[i].error_mqh == NULL)
701 : {
1892 rhaas 702 UIC 0 : pcxt->known_attached_workers[i] = true;
703 0 : ++pcxt->nknown_attached_workers;
1892 rhaas 704 LBC 0 : continue;
1892 rhaas 705 ECB : }
706 :
1892 rhaas 707 GIC 4193798 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
708 4193798 : if (status == BGWH_STARTED)
709 : {
710 : /* Has the worker attached to the error queue? */
1892 rhaas 711 CBC 4193742 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
1892 rhaas 712 GIC 4193742 : if (shm_mq_get_sender(mq) != NULL)
1892 rhaas 713 EUB : {
714 : /* Yes, so it is known to be attached. */
1892 rhaas 715 GBC 57 : pcxt->known_attached_workers[i] = true;
1892 rhaas 716 GIC 57 : ++pcxt->nknown_attached_workers;
717 : }
1892 rhaas 718 ECB : }
1892 rhaas 719 CBC 56 : else if (status == BGWH_STOPPED)
720 : {
721 : /*
1892 rhaas 722 ECB : * If the worker stopped without attaching to the error queue,
723 : * throw an error.
724 : */
1892 rhaas 725 UIC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
1892 rhaas 726 LBC 0 : if (shm_mq_get_sender(mq) == NULL)
727 0 : ereport(ERROR,
728 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 : errmsg("parallel worker failed to initialize"),
1892 rhaas 730 ECB : errhint("More details may be available in the server log.")));
731 :
1892 rhaas 732 UIC 0 : pcxt->known_attached_workers[i] = true;
733 0 : ++pcxt->nknown_attached_workers;
734 : }
735 : else
1892 rhaas 736 EUB : {
737 : /*
738 : * Worker not yet started, so we must wait. The postmaster
739 : * will notify us if the worker's state changes. Our latch
740 : * might also get set for some other reason, but if so we'll
741 : * just end up waiting for the same worker again.
742 : */
1892 rhaas 743 GBC 56 : rc = WaitLatch(MyLatch,
1598 tmunro 744 EUB : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
745 : -1, WAIT_EVENT_BGWORKER_STARTUP);
746 :
1892 rhaas 747 GIC 56 : if (rc & WL_LATCH_SET)
748 56 : ResetLatch(MyLatch);
749 : }
750 : }
751 :
752 : /* If all workers are known to have started, we're done. */
753 4193812 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
1892 rhaas 754 ECB : {
1892 rhaas 755 GIC 71 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
756 71 : break;
757 : }
1892 rhaas 758 ECB : }
759 : }
760 :
761 : /*
762 : * Wait for all workers to finish computing.
763 : *
2901 764 : * Even if the parallel operation seems to have completed successfully, it's
765 : * important to call this function afterwards. We must not miss any errors
766 : * the workers may have thrown during the parallel operation, or any that they
767 : * may yet throw while shutting down.
768 : *
769 : * Also, we want to update our notion of XactLastRecEnd based on worker
770 : * feedback.
771 : */
772 : void
2901 rhaas 773 GIC 662 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
774 : {
775 : for (;;)
776 569 : {
2878 bruce 777 1231 : bool anyone_alive = false;
1902 rhaas 778 1231 : int nfinished = 0;
779 : int i;
780 :
781 : /*
782 : * This will process any parallel messages that are pending, which may
783 : * change the outcome of the loop that follows. It may also throw an
2878 bruce 784 ECB : * error propagated from a worker.
785 : */
2901 rhaas 786 GIC 1231 : CHECK_FOR_INTERRUPTS();
2901 rhaas 787 ECB :
2592 rhaas 788 CBC 3281 : for (i = 0; i < pcxt->nworkers_launched; ++i)
2901 rhaas 789 ECB : {
790 : /*
791 : * If error_mqh is NULL, then the worker has already exited
792 : * cleanly. If we have received a message through error_mqh from
793 : * the worker, we know it started up cleanly, and therefore we're
794 : * certain to be notified when it exits.
795 : */
1902 rhaas 796 GIC 2591 : if (pcxt->worker[i].error_mqh == NULL)
1902 rhaas 797 CBC 1998 : ++nfinished;
1892 rhaas 798 GIC 593 : else if (pcxt->known_attached_workers[i])
2901 rhaas 799 ECB : {
2901 rhaas 800 GIC 541 : anyone_alive = true;
801 541 : break;
802 : }
803 : }
804 :
805 1231 : if (!anyone_alive)
806 : {
1902 rhaas 807 ECB : /* If all workers are known to have finished, we're done. */
1902 rhaas 808 CBC 690 : if (nfinished >= pcxt->nworkers_launched)
1902 rhaas 809 ECB : {
1902 rhaas 810 GIC 662 : Assert(nfinished == pcxt->nworkers_launched);
1902 rhaas 811 CBC 662 : break;
1902 rhaas 812 ECB : }
813 :
814 : /*
815 : * We didn't detect any living workers, but not all workers are
816 : * known to have exited cleanly. Either not all workers have
817 : * launched yet, or maybe some of them failed to start or
818 : * terminated abnormally.
819 : */
1902 rhaas 820 GIC 76 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1902 rhaas 821 ECB : {
822 : pid_t pid;
823 : shm_mq *mq;
824 :
825 : /*
826 : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
827 : * should just keep waiting. If it is BGWH_STOPPED, then
828 : * further investigation is needed.
829 : */
1902 rhaas 830 GIC 48 : if (pcxt->worker[i].error_mqh == NULL ||
1902 rhaas 831 CBC 92 : pcxt->worker[i].bgwhandle == NULL ||
1902 rhaas 832 GIC 46 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
833 : &pid) != BGWH_STOPPED)
834 48 : continue;
835 :
836 : /*
837 : * Check whether the worker ended up stopped without ever
838 : * attaching to the error queue. If so, the postmaster was
839 : * unable to fork the worker or it exited without initializing
840 : * properly. We must throw an error, since the caller may
1902 rhaas 841 ECB : * have been expecting the worker to do some work before
842 : * exiting.
843 : */
1902 rhaas 844 UIC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
1902 rhaas 845 LBC 0 : if (shm_mq_get_sender(mq) == NULL)
1902 rhaas 846 UIC 0 : ereport(ERROR,
847 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
848 : errmsg("parallel worker failed to initialize"),
849 : errhint("More details may be available in the server log.")));
850 :
851 : /*
852 : * The worker is stopped, but is attached to the error queue.
853 : * Unless there's a bug somewhere, this will only happen when
854 : * the worker writes messages and terminates after the
1902 rhaas 855 EUB : * CHECK_FOR_INTERRUPTS() near the top of this function and
856 : * before the call to GetBackgroundWorkerPid(). In that case,
857 : * or latch should have been set as well and the right things
858 : * will happen on the next pass through the loop.
859 : */
860 : }
861 : }
862 :
1598 tmunro 863 GIC 569 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
864 : WAIT_EVENT_PARALLEL_FINISH);
2133 andres 865 569 : ResetLatch(MyLatch);
866 : }
867 :
2901 rhaas 868 662 : if (pcxt->toc != NULL)
869 : {
870 : FixedParallelState *fps;
871 :
2134 tgl 872 662 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
2901 rhaas 873 662 : if (fps->last_xlog_end > XactLastRecEnd)
2901 rhaas 874 CBC 10 : XactLastRecEnd = fps->last_xlog_end;
875 : }
876 662 : }
877 :
878 : /*
2718 rhaas 879 ECB : * Wait for all workers to exit.
880 : *
881 : * This function ensures that workers have been completely shutdown. The
882 : * difference between WaitForParallelWorkersToFinish and this function is
1029 andres 883 : * that the former just ensures that last message sent by a worker backend is
884 : * received by the leader backend whereas this ensures the complete shutdown.
2718 rhaas 885 : */
886 : static void
2718 rhaas 887 CBC 534 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
888 : {
889 : int i;
890 :
891 : /* Wait until the workers actually die. */
2592 rhaas 892 GIC 1832 : for (i = 0; i < pcxt->nworkers_launched; ++i)
893 : {
894 : BgwHandleStatus status;
895 :
2718 896 1298 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
2718 rhaas 897 UIC 0 : continue;
2718 rhaas 898 ECB :
2718 rhaas 899 GIC 1298 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
900 :
901 : /*
902 : * If the postmaster kicked the bucket, we have no chance of cleaning
2718 rhaas 903 ECB : * up safely -- we won't be able to tell when our workers are actually
904 : * dead. This doesn't necessitate a PANIC since they will all abort
905 : * eventually, but we can't safely continue this session.
906 : */
2718 rhaas 907 CBC 1298 : if (status == BGWH_POSTMASTER_DIED)
2718 rhaas 908 UBC 0 : ereport(FATAL,
909 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
2118 tgl 910 ECB : errmsg("postmaster exited during a parallel transaction")));
911 :
912 : /* Release memory. */
2718 rhaas 913 GIC 1298 : pfree(pcxt->worker[i].bgwhandle);
914 1298 : pcxt->worker[i].bgwhandle = NULL;
915 : }
916 534 : }
917 :
2901 rhaas 918 ECB : /*
2901 rhaas 919 EUB : * Destroy a parallel context.
920 : *
921 : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
922 : * first, before calling this function. When this function is invoked, any
923 : * remaining workers are forcibly killed; the dynamic shared memory segment
2901 rhaas 924 ECB : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
925 : */
926 : void
2901 rhaas 927 CBC 403 : DestroyParallelContext(ParallelContext *pcxt)
928 : {
929 : int i;
930 :
931 : /*
932 : * Be careful about order of operations here! We remove the parallel
933 : * context from the list before we do anything else; otherwise, if an
934 : * error occurs during a subsequent step, we might try to nuke it again
935 : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
936 : */
2901 rhaas 937 GIC 403 : dlist_delete(&pcxt->node);
2901 rhaas 938 ECB :
939 : /* Kill each worker in turn, and forget their error queues. */
2748 rhaas 940 GIC 403 : if (pcxt->worker != NULL)
941 : {
2592 942 1286 : for (i = 0; i < pcxt->nworkers_launched; ++i)
943 : {
2748 944 883 : if (pcxt->worker[i].error_mqh != NULL)
945 : {
2718 946 3 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
947 :
2047 tgl 948 CBC 3 : shm_mq_detach(pcxt->worker[i].error_mqh);
2748 rhaas 949 GIC 3 : pcxt->worker[i].error_mqh = NULL;
950 : }
2901 rhaas 951 ECB : }
952 : }
953 :
954 : /*
955 : * If we have allocated a shared memory segment, detach it. This will
956 : * implicitly detach the error queues, and any other shared memory queues,
957 : * stored there.
958 : */
2901 rhaas 959 CBC 403 : if (pcxt->seg != NULL)
2901 rhaas 960 ECB : {
2901 rhaas 961 GIC 403 : dsm_detach(pcxt->seg);
962 403 : pcxt->seg = NULL;
963 : }
964 :
965 : /*
966 : * If this parallel context is actually in backend-private memory rather
967 : * than shared memory, free that memory instead.
968 : */
2896 969 403 : if (pcxt->private_memory != NULL)
2901 rhaas 970 ECB : {
2896 rhaas 971 UIC 0 : pfree(pcxt->private_memory);
2896 rhaas 972 LBC 0 : pcxt->private_memory = NULL;
2901 rhaas 973 ECB : }
974 :
975 : /*
976 : * We can't finish transaction commit or abort until all of the workers
977 : * have exited. This means, in particular, that we can't respond to
978 : * interrupts at this stage.
979 : */
2718 rhaas 980 CBC 403 : HOLD_INTERRUPTS();
2718 rhaas 981 GIC 403 : WaitForParallelWorkersToExit(pcxt);
2718 rhaas 982 GBC 403 : RESUME_INTERRUPTS();
2901 rhaas 983 EUB :
984 : /* Free the worker array itself. */
2901 rhaas 985 GIC 403 : if (pcxt->worker != NULL)
986 : {
987 403 : pfree(pcxt->worker);
988 403 : pcxt->worker = NULL;
989 : }
990 :
2901 rhaas 991 ECB : /* Free memory. */
2186 tgl 992 CBC 403 : pfree(pcxt->library_name);
993 403 : pfree(pcxt->function_name);
2901 rhaas 994 GIC 403 : pfree(pcxt);
995 403 : }
2901 rhaas 996 ECB :
997 : /*
998 : * Are there any parallel contexts currently active?
999 : */
1000 : bool
2901 rhaas 1001 GIC 483208 : ParallelContextActive(void)
1002 : {
2901 rhaas 1003 CBC 483208 : return !dlist_is_empty(&pcxt_list);
2901 rhaas 1004 ECB : }
1005 :
1006 : /*
1007 : * Handle receipt of an interrupt indicating a parallel worker message.
1008 : *
1009 : * Note: this is called within a signal handler! All we can do is set
1010 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1011 : * HandleParallelMessages().
1012 : */
1013 : void
2901 rhaas 1014 CBC 2711 : HandleParallelMessageInterrupt(void)
1015 : {
2901 rhaas 1016 GIC 2711 : InterruptPending = true;
1017 2711 : ParallelMessagePending = true;
1018 2711 : SetLatch(MyLatch);
1019 2711 : }
1020 :
1021 : /*
1022 : * Handle any queued protocol messages received from parallel workers.
1023 : */
1024 : void
2901 rhaas 1025 CBC 2606 : HandleParallelMessages(void)
1026 : {
2901 rhaas 1027 ECB : dlist_iter iter;
2417 tgl 1028 : MemoryContext oldcontext;
1029 :
1030 : static MemoryContext hpm_context = NULL;
1031 :
1032 : /*
1033 : * This is invoked from ProcessInterrupts(), and since some of the
1034 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1035 : * for recursive calls if more signals are received while this runs. It's
2441 1036 : * unclear that recursive entry would be safe, and it doesn't seem useful
1037 : * even if it is safe, so let's block interrupts until done.
1038 : */
2441 tgl 1039 GIC 2606 : HOLD_INTERRUPTS();
1040 :
1041 : /*
1042 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1043 : * don't want to risk leaking data into long-lived contexts, so let's do
1044 : * our work here in a private context that we can reset on each use.
1045 : */
2417 1046 2606 : if (hpm_context == NULL) /* first time through? */
1047 55 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1048 : "HandleParallelMessages",
1049 : ALLOCSET_DEFAULT_SIZES);
2417 tgl 1050 ECB : else
2417 tgl 1051 GIC 2551 : MemoryContextReset(hpm_context);
1052 :
1053 2606 : oldcontext = MemoryContextSwitchTo(hpm_context);
1054 :
1055 : /* OK to process messages. Reset the flag saying there are more to do. */
2901 rhaas 1056 2606 : ParallelMessagePending = false;
2901 rhaas 1057 ECB :
2901 rhaas 1058 CBC 5215 : dlist_foreach(iter, &pcxt_list)
1059 : {
1060 : ParallelContext *pcxt;
1061 : int i;
2901 rhaas 1062 ECB :
2901 rhaas 1063 GIC 2612 : pcxt = dlist_container(ParallelContext, node, iter.cur);
2901 rhaas 1064 CBC 2612 : if (pcxt->worker == NULL)
2901 rhaas 1065 UIC 0 : continue;
1066 :
2592 rhaas 1067 CBC 10396 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1068 : {
2901 rhaas 1069 ECB : /*
1070 : * Read as many messages as we can from each worker, but stop when
1071 : * either (1) the worker's error queue goes away, which can happen
1072 : * if we receive a Terminate message from the worker; or (2) no
1073 : * more messages can be read from the worker without blocking.
1074 : */
2901 rhaas 1075 CBC 10380 : while (pcxt->worker[i].error_mqh != NULL)
2901 rhaas 1076 EUB : {
1077 : shm_mq_result res;
2441 tgl 1078 ECB : Size nbytes;
1079 : void *data;
1080 :
2901 rhaas 1081 GIC 7560 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1082 : &data, true);
1083 7560 : if (res == SHM_MQ_WOULD_BLOCK)
1084 4964 : break;
1085 2596 : else if (res == SHM_MQ_SUCCESS)
2901 rhaas 1086 ECB : {
1087 : StringInfoData msg;
1088 :
2901 rhaas 1089 GIC 2596 : initStringInfo(&msg);
1090 2596 : appendBinaryStringInfo(&msg, data, nbytes);
1091 2596 : HandleParallelMessage(pcxt, i, &msg);
2901 rhaas 1092 CBC 2593 : pfree(msg.data);
1093 : }
2901 rhaas 1094 ECB : else
2901 rhaas 1095 LBC 0 : ereport(ERROR,
2118 tgl 1096 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1097 : errmsg("lost connection to parallel worker")));
1098 : }
1099 : }
2901 rhaas 1100 : }
2441 tgl 1101 :
2417 tgl 1102 CBC 2603 : MemoryContextSwitchTo(oldcontext);
2417 tgl 1103 ECB :
1104 : /* Might as well clear the context on our way out */
2417 tgl 1105 GIC 2603 : MemoryContextReset(hpm_context);
2417 tgl 1106 EUB :
2441 tgl 1107 GIC 2603 : RESUME_INTERRUPTS();
2901 rhaas 1108 2603 : }
1109 :
1110 : /*
1111 : * Handle a single protocol message received from a single parallel worker.
1112 : */
2901 rhaas 1113 ECB : static void
2901 rhaas 1114 GIC 2596 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1115 : {
2878 bruce 1116 ECB : char msgtype;
1117 :
1892 rhaas 1118 CBC 2596 : if (pcxt->known_attached_workers != NULL &&
1119 2596 : !pcxt->known_attached_workers[i])
1120 : {
1892 rhaas 1121 GIC 1241 : pcxt->known_attached_workers[i] = true;
1122 1241 : pcxt->nknown_attached_workers++;
1123 : }
1124 :
2901 rhaas 1125 CBC 2596 : msgtype = pq_getmsgbyte(msg);
1126 :
2901 rhaas 1127 GIC 2596 : switch (msgtype)
1128 : {
2878 bruce 1129 CBC 1298 : case 'K': /* BackendKeyData */
2901 rhaas 1130 ECB : {
2878 bruce 1131 GIC 1298 : int32 pid = pq_getmsgint(msg, 4);
2878 bruce 1132 ECB :
2901 rhaas 1133 CBC 1298 : (void) pq_getmsgint(msg, 4); /* discard cancel key */
2901 rhaas 1134 GIC 1298 : (void) pq_getmsgend(msg);
1135 1298 : pcxt->worker[i].pid = pid;
2901 rhaas 1136 CBC 1298 : break;
1137 : }
2901 rhaas 1138 ECB :
2878 bruce 1139 GIC 3 : case 'E': /* ErrorResponse */
2878 bruce 1140 ECB : case 'N': /* NoticeResponse */
1141 : {
2901 rhaas 1142 : ErrorData edata;
1143 : ErrorContextCallback *save_error_context_stack;
1144 :
2726 1145 : /* Parse ErrorResponse or NoticeResponse. */
2901 rhaas 1146 CBC 3 : pq_parse_errornotice(msg, &edata);
2901 rhaas 1147 ECB :
1148 : /* Death of a worker isn't enough justification for suicide. */
2901 rhaas 1149 GIC 3 : edata.elevel = Min(edata.elevel, ERROR);
2901 rhaas 1150 ECB :
1151 : /*
1152 : * If desired, add a context line to show that this is a
1153 : * message propagated from a parallel worker. Otherwise, it
1154 : * can sometimes be confusing to understand what actually
1155 : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1156 : * because it causes test-result instability depending on
2417 tgl 1157 : * whether a parallel worker is actually used or not.)
1158 : */
53 drowley 1159 GNC 3 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
2417 tgl 1160 ECB : {
2417 tgl 1161 GIC 3 : if (edata.context)
2417 tgl 1162 UIC 0 : edata.context = psprintf("%s\n%s", edata.context,
1163 : _("parallel worker"));
1164 : else
2417 tgl 1165 GIC 3 : edata.context = pstrdup(_("parallel worker"));
1166 : }
1167 :
1168 : /*
1169 : * Context beyond that should use the error context callbacks
2417 tgl 1170 ECB : * that were in effect when the ParallelContext was created,
1171 : * not the current ones.
1172 : */
2417 tgl 1173 GBC 3 : save_error_context_stack = error_context_stack;
2417 tgl 1174 GIC 3 : error_context_stack = pcxt->error_context_stack;
1175 :
2417 tgl 1176 ECB : /* Rethrow error or print notice. */
2901 rhaas 1177 GIC 3 : ThrowErrorData(&edata);
1178 :
1179 : /* Not an error, so restore previous context stack. */
2901 rhaas 1180 UIC 0 : error_context_stack = save_error_context_stack;
1181 :
1182 0 : break;
1183 : }
2901 rhaas 1184 ECB :
2878 bruce 1185 LBC 0 : case 'A': /* NotifyResponse */
1186 : {
1187 : /* Propagate NotifyResponse. */
2474 rhaas 1188 ECB : int32 pid;
1189 : const char *channel;
1190 : const char *payload;
2474 rhaas 1191 EUB :
2474 rhaas 1192 UIC 0 : pid = pq_getmsgint(msg, 4);
2474 rhaas 1193 UBC 0 : channel = pq_getmsgrawstring(msg);
2474 rhaas 1194 UIC 0 : payload = pq_getmsgrawstring(msg);
1195 0 : pq_endmessage(msg);
2474 rhaas 1196 EUB :
2474 rhaas 1197 UIC 0 : NotifyMyFrontEnd(channel, payload, pid);
1198 :
2901 1199 0 : break;
1200 : }
1201 :
2878 bruce 1202 GIC 1295 : case 'X': /* Terminate, indicating clean exit */
2901 rhaas 1203 EUB : {
2047 tgl 1204 GBC 1295 : shm_mq_detach(pcxt->worker[i].error_mqh);
2901 rhaas 1205 1295 : pcxt->worker[i].error_mqh = NULL;
1206 1295 : break;
1207 : }
2901 rhaas 1208 EUB :
2901 rhaas 1209 UIC 0 : default:
2901 rhaas 1210 EUB : {
2442 tgl 1211 UIC 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1212 : msgtype, msg->len);
2901 rhaas 1213 ECB : }
1214 : }
2901 rhaas 1215 CBC 2593 : }
2901 rhaas 1216 ECB :
1217 : /*
1218 : * End-of-subtransaction cleanup for parallel contexts.
1219 : *
2901 rhaas 1220 EUB : * Currently, it's forbidden to enter or leave a subtransaction while
1221 : * parallel mode is in effect, so we could just blow away everything. But
1222 : * we may want to relax that restriction in the future, so this code
1223 : * contemplates that there may be multiple subtransaction IDs in pcxt_list.
1224 : */
1225 : void
2901 rhaas 1226 CBC 3 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1227 : {
2901 rhaas 1228 GIC 6 : while (!dlist_is_empty(&pcxt_list))
1229 : {
1230 : ParallelContext *pcxt;
1231 :
1232 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1233 3 : if (pcxt->subid != mySubId)
2901 rhaas 1234 UIC 0 : break;
2901 rhaas 1235 GIC 3 : if (isCommit)
2901 rhaas 1236 UIC 0 : elog(WARNING, "leaked parallel context");
2901 rhaas 1237 CBC 3 : DestroyParallelContext(pcxt);
1238 : }
1239 3 : }
1240 :
1241 : /*
1242 : * End-of-transaction cleanup for parallel contexts.
2901 rhaas 1243 ECB : */
1244 : void
2901 rhaas 1245 GBC 1298 : AtEOXact_Parallel(bool isCommit)
2901 rhaas 1246 ECB : {
2901 rhaas 1247 GBC 1298 : while (!dlist_is_empty(&pcxt_list))
2901 rhaas 1248 ECB : {
1249 : ParallelContext *pcxt;
1250 :
2901 rhaas 1251 UIC 0 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1252 0 : if (isCommit)
1253 0 : elog(WARNING, "leaked parallel context");
1254 0 : DestroyParallelContext(pcxt);
1255 : }
2901 rhaas 1256 CBC 1298 : }
1257 :
2901 rhaas 1258 ECB : /*
1259 : * Main entrypoint for parallel workers.
1260 : */
1261 : void
2901 rhaas 1262 GBC 1298 : ParallelWorkerMain(Datum main_arg)
2901 rhaas 1263 EUB : {
1264 : dsm_segment *seg;
2878 bruce 1265 : shm_toc *toc;
1266 : FixedParallelState *fps;
2878 bruce 1267 ECB : char *error_queue_space;
1268 : shm_mq *mq;
1269 : shm_mq_handle *mqh;
1270 : char *libraryspace;
1271 : char *entrypointstate;
1272 : char *library_name;
2186 tgl 1273 : char *function_name;
1274 : parallel_worker_main_type entrypt;
1275 : char *gucspace;
1276 : char *combocidspace;
1277 : char *tsnapspace;
1278 : char *asnapspace;
1279 : char *tstatespace;
1280 : char *pendingsyncsspace;
1281 : char *reindexspace;
1282 : char *relmapperspace;
1283 : char *uncommittedenumsspace;
1284 : char *clientconninfospace;
1285 : StringInfoData msgbuf;
1286 : char *session_dsm_handle_space;
1287 : Snapshot tsnapshot;
1288 : Snapshot asnapshot;
1289 :
1290 : /* Set flag to indicate that we're initializing a parallel worker. */
2732 rhaas 1291 GIC 1298 : InitializingParallelWorker = true;
1292 :
1293 : /* Establish signal handlers. */
2901 1294 1298 : pqsignal(SIGTERM, die);
1295 1298 : BackgroundWorkerUnblockSignals();
1296 :
1297 : /* Determine and set our parallel worker number. */
2712 1298 1298 : Assert(ParallelWorkerNumber == -1);
1299 1298 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1300 :
1301 : /* Set up a memory context to work in, just for cleanliness. */
2901 1302 1298 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
2416 tgl 1303 ECB : "Parallel worker",
1304 : ALLOCSET_DEFAULT_SIZES);
1305 :
2901 rhaas 1306 : /*
1726 tgl 1307 : * Attach to the dynamic shared memory segment for the parallel query, and
1308 : * find its table of contents.
1309 : *
1310 : * Note: at this point, we have not created any ResourceOwner in this
1311 : * process. This will result in our DSM mapping surviving until process
1312 : * exit, which is fine. If there were a ResourceOwner, it would acquire
1313 : * ownership of the mapping, but we have no need for that.
2901 rhaas 1314 : */
2901 rhaas 1315 GIC 1298 : seg = dsm_attach(DatumGetUInt32(main_arg));
1316 1298 : if (seg == NULL)
2901 rhaas 1317 UIC 0 : ereport(ERROR,
1318 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1319 : errmsg("could not map dynamic shared memory segment")));
2901 rhaas 1320 GIC 1298 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1321 1298 : if (toc == NULL)
2901 rhaas 1322 UIC 0 : ereport(ERROR,
1323 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1324 : errmsg("invalid magic number in dynamic shared memory segment")));
1325 :
1326 : /* Look up fixed parallel state. */
2134 tgl 1327 CBC 1298 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
2901 rhaas 1328 1298 : MyFixedParallelState = fps;
2901 rhaas 1329 EUB :
1330 : /* Arrange to signal the leader if we exit. */
1029 andres 1331 GIC 1298 : ParallelLeaderPid = fps->parallel_leader_pid;
1029 andres 1332 CBC 1298 : ParallelLeaderBackendId = fps->parallel_leader_backend_id;
739 1333 1298 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1902 rhaas 1334 EUB :
1335 : /*
1336 : * Now we can find and attach to the error queue provided for us. That's
1337 : * good, because until we do that, any errors that happen here will not be
1338 : * reported back to the process that requested that this worker be
1902 rhaas 1339 ECB : * launched.
2901 1340 : */
2134 tgl 1341 GIC 1298 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
2901 rhaas 1342 1298 : mq = (shm_mq *) (error_queue_space +
2878 bruce 1343 CBC 1298 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
2901 rhaas 1344 1298 : shm_mq_set_sender(mq, MyProc);
1345 1298 : mqh = shm_mq_attach(mq, seg, NULL);
2732 rhaas 1346 GIC 1298 : pq_redirect_to_shm_mq(seg, mqh);
1029 andres 1347 1298 : pq_set_parallel_leader(fps->parallel_leader_pid,
1348 : fps->parallel_leader_backend_id);
1349 :
1350 : /*
1351 : * Send a BackendKeyData message to the process that initiated parallelism
1352 : * so that it has access to our PID before it receives any other messages
2878 bruce 1353 ECB : * from us. Our cancel key is sent, too, since that's the way the
1354 : * protocol message is defined, but it won't actually be used for anything
1355 : * in this case.
2901 rhaas 1356 : */
2901 rhaas 1357 CBC 1298 : pq_beginmessage(&msgbuf, 'K');
2006 andres 1358 1298 : pq_sendint32(&msgbuf, (int32) MyProcPid);
1359 1298 : pq_sendint32(&msgbuf, (int32) MyCancelKey);
2901 rhaas 1360 GIC 1298 : pq_endmessage(&msgbuf);
1361 :
1362 : /*
1363 : * Hooray! Primary initialization is complete. Now, we need to set up our
1364 : * backend-local state to match the original backend.
1365 : */
1366 :
1367 : /*
1368 : * Join locking group. We must do this before anything that could try to
2495 rhaas 1369 ECB : * acquire a heavyweight lock, because any heavyweight locks acquired to
1370 : * this point could block either directly against the parallel group
2618 1371 : * leader or against some process which in turn waits for a lock that
1372 : * conflicts with the parallel group leader, causing an undetected
1373 : * deadlock. (If we can't join the lock group, the leader has gone away,
1374 : * so just exit quietly.)
1375 : */
1029 andres 1376 GIC 1298 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1377 : fps->parallel_leader_pid))
2618 rhaas 1378 UIC 0 : return;
1379 :
1380 : /*
1381 : * Restore transaction and statement start-time timestamps. This must
1382 : * happen before anything that would start a transaction, else asserts in
1383 : * xact.c will fire.
1384 : */
1646 tgl 1385 GIC 1298 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1386 :
1387 : /*
2186 tgl 1388 ECB : * Identify the entry point to be called. In theory this could result in
1389 : * loading an additional library, though most likely the entry point is in
2186 tgl 1390 EUB : * the core backend or in a library we just loaded.
1391 : */
2134 tgl 1392 GIC 1298 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
2186 1393 1298 : library_name = entrypointstate;
1394 1298 : function_name = entrypointstate + strlen(library_name) + 1;
1395 :
1396 1298 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
2186 tgl 1397 ECB :
1398 : /* Restore database connection. */
2901 rhaas 1399 GIC 1298 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1400 : fps->authenticated_user_id,
1401 : 0);
1402 :
1403 : /*
2474 rhaas 1404 ECB : * Set the client encoding to the database encoding, since that is what
1405 : * the leader will expect.
1406 : */
2474 rhaas 1407 GIC 1298 : SetClientEncoding(GetDatabaseEncoding());
2474 rhaas 1408 ECB :
1409 : /*
1410 : * Load libraries that were loaded by original backend. We want to do
1662 tmunro 1411 : * this before restoring GUCs, because the libraries might define custom
1412 : * variables.
1413 : */
1662 tmunro 1414 GIC 1298 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1415 1298 : StartTransactionCommand();
1416 1298 : RestoreLibraryState(libraryspace);
1417 :
1418 : /* Restore GUC values from launching backend. */
2134 tgl 1419 CBC 1298 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
2901 rhaas 1420 GIC 1298 : RestoreGUCState(gucspace);
1421 1298 : CommitTransactionCommand();
1422 :
1423 : /* Crank up a transaction state appropriate to a parallel worker. */
2134 tgl 1424 1298 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
2901 rhaas 1425 1298 : StartParallelWorkerTransaction(tstatespace);
2901 rhaas 1426 ECB :
1427 : /* Restore combo CID state. */
2134 tgl 1428 CBC 1298 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
2901 rhaas 1429 GIC 1298 : RestoreComboCIDState(combocidspace);
1430 :
2033 andres 1431 ECB : /* Attach to the per-session DSM segment and contained objects. */
1432 : session_dsm_handle_space =
2033 andres 1433 CBC 1298 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
2033 andres 1434 GIC 1298 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1435 :
592 rhaas 1436 ECB : /*
1437 : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1438 : * the leader has serialized the transaction snapshot and we must restore
1439 : * it. At lower isolation levels, there is no transaction-lifetime
1440 : * snapshot, but we need TransactionXmin to get set to a value which is
1441 : * less than or equal to the xmin of every snapshot that will be used by
1442 : * this worker. The easiest way to accomplish that is to install the
1443 : * active snapshot as the transaction snapshot. Code running in this
1444 : * parallel worker might take new snapshots via GetTransactionSnapshot()
1445 : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1446 : * snapshot older than the active snapshot.
1447 : */
2134 tgl 1448 GIC 1298 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
592 rhaas 1449 1298 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1450 1298 : asnapshot = RestoreSnapshot(asnapspace);
1451 1298 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1452 1298 : RestoreTransactionSnapshot(tsnapshot,
1453 1298 : fps->parallel_leader_pgproc);
1454 1298 : PushActiveSnapshot(asnapshot);
1455 :
1456 : /*
1457 : * We've changed which tuples we can see, and must therefore invalidate
1458 : * system caches.
1459 : */
2732 rhaas 1460 CBC 1298 : InvalidateSystemCaches();
2732 rhaas 1461 ECB :
1988 1462 : /*
1463 : * Restore current role id. Skip verifying whether session user is
1464 : * allowed to become this role and blindly restore the leader's state for
1465 : * current role.
1466 : */
1988 rhaas 1467 GIC 1298 : SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1468 :
1469 : /* Restore user ID and security context. */
2901 1470 1298 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1471 :
2495 tgl 1472 ECB : /* Restore temp-namespace state to ensure search path matches leader's. */
2495 tgl 1473 GIC 1298 : SetTempNamespaceState(fps->temp_namespace_id,
1474 : fps->temp_toast_namespace_id);
1475 :
1476 : /* Restore pending syncs. */
1100 noah 1477 1298 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1478 : false);
1100 noah 1479 CBC 1298 : RestorePendingSyncs(pendingsyncsspace);
1480 :
1481 : /* Restore reindex state. */
1906 rhaas 1482 1298 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1906 rhaas 1483 GIC 1298 : RestoreReindexState(reindexspace);
1484 :
1703 pg 1485 ECB : /* Restore relmapper state. */
1703 pg 1486 GIC 1298 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1487 1298 : RestoreRelationMap(relmapperspace);
1488 :
824 tmunro 1489 ECB : /* Restore uncommitted enums. */
824 tmunro 1490 GIC 1298 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
824 tmunro 1491 ECB : false);
824 tmunro 1492 GIC 1298 : RestoreUncommittedEnums(uncommittedenumsspace);
1493 :
1494 : /* Restore the ClientConnectionInfo. */
228 michael 1495 GNC 1298 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1496 : false);
1497 1298 : RestoreClientConnectionInfo(clientconninfospace);
1498 :
1499 : /*
1500 : * Initialize SystemUser now that MyClientConnectionInfo is restored.
1501 : * Also ensure that auth_method is actually valid, aka authn_id is not NULL.
1502 : */
192 1503 1298 : if (MyClientConnectionInfo.authn_id)
1504 4 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1505 : hba_authname(MyClientConnectionInfo.auth_method));
1506 :
1486 tmunro 1507 ECB : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
1486 tmunro 1508 CBC 1298 : AttachSerializableXact(fps->serializable_xact_handle);
1509 :
1510 : /*
2878 bruce 1511 ECB : * We've initialized all of our state now; nothing should change
1512 : * hereafter.
1513 : */
2732 rhaas 1514 GIC 1298 : InitializingParallelWorker = false;
2901 rhaas 1515 CBC 1298 : EnterParallelMode();
1516 :
2901 rhaas 1517 ECB : /*
1518 : * Time to do the real work: invoke the caller-supplied code.
1519 : */
2186 tgl 1520 CBC 1298 : entrypt(seg, toc);
1521 :
2901 rhaas 1522 ECB : /* Must exit parallel mode to pop active snapshot. */
2901 rhaas 1523 GIC 1295 : ExitParallelMode();
1524 :
1525 : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1526 1295 : PopActiveSnapshot();
1527 :
2901 rhaas 1528 ECB : /* Shut down the parallel-worker transaction. */
2901 rhaas 1529 CBC 1295 : EndParallelWorkerTransaction();
1530 :
1531 : /* Detach from the per-session DSM segment. */
2033 andres 1532 GIC 1295 : DetachSession();
2033 andres 1533 ECB :
1534 : /* Report success. */
2901 rhaas 1535 GIC 1295 : pq_putmessage('X', NULL, 0);
1536 : }
1537 :
1538 : /*
2901 rhaas 1539 ECB : * Update shared memory with the ending location of the last WAL record we
1540 : * wrote, if it's greater than the value already stored there.
1541 : */
1542 : void
2901 rhaas 1543 GIC 1295 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1544 : {
2901 rhaas 1545 CBC 1295 : FixedParallelState *fps = MyFixedParallelState;
1546 :
2901 rhaas 1547 GIC 1295 : Assert(fps != NULL);
2901 rhaas 1548 CBC 1295 : SpinLockAcquire(&fps->mutex);
2901 rhaas 1549 GIC 1295 : if (fps->last_xlog_end < last_xlog_end)
1550 58 : fps->last_xlog_end = last_xlog_end;
2901 rhaas 1551 CBC 1295 : SpinLockRelease(&fps->mutex);
2901 rhaas 1552 GIC 1295 : }
1553 :
1902 rhaas 1554 ECB : /*
1555 : * Make sure the leader tries to read from our error queue one more time.
1556 : * This guards against the case where we exit uncleanly without sending an
1557 : * ErrorResponse to the leader, for example because some code calls proc_exit
1558 : * directly.
1559 : *
739 andres 1560 : * Also explicitly detach from dsm segment so that subsystems using
1561 : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1562 : * shut down as part of a before_shmem_exit() hook.
1563 : *
1564 : * One might think this could instead be solved by carefully ordering the
1565 : * attaching to dsm segments, so that the pgstats segments get detached from
1566 : * later than the parallel query one. That turns out to not work because the
1567 : * stats hash might need to grow which can cause new segments to be allocated,
1568 : * which then will be detached from earlier.
1569 : */
1902 rhaas 1570 : static void
1902 rhaas 1571 GIC 1298 : ParallelWorkerShutdown(int code, Datum arg)
1902 rhaas 1572 ECB : {
1029 andres 1573 CBC 1298 : SendProcSignal(ParallelLeaderPid,
1902 rhaas 1574 ECB : PROCSIG_PARALLEL_MESSAGE,
1029 andres 1575 : ParallelLeaderBackendId);
739 1576 :
739 andres 1577 CBC 1298 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
1902 rhaas 1578 GIC 1298 : }
1579 :
1580 : /*
1581 : * Look up (and possibly load) a parallel worker entry point function.
1582 : *
1583 : * For functions contained in the core code, we use library name "postgres"
1584 : * and consult the InternalParallelWorkers array. External functions are
1585 : * looked up, and loaded if necessary, using load_external_function().
1586 : *
1587 : * The point of this is to pass function names as strings across process
1588 : * boundaries. We can't pass actual function addresses because of the
1589 : * possibility that the function has been loaded at a different address
1590 : * in a different process. This is obviously a hazard for functions in
1591 : * loadable libraries, but it can happen even for functions in the core code
1592 : * on platforms using EXEC_BACKEND (e.g., Windows).
1593 : *
1594 : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1595 : * in favor of applying load_external_function() for core functions too;
2186 tgl 1596 ECB : * but that raises portability issues that are not worth addressing now.
1597 : */
1598 : static parallel_worker_main_type
2186 tgl 1599 GIC 1298 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1600 : {
1601 : /*
2186 tgl 1602 ECB : * If the function is to be loaded from postgres itself, search the
1603 : * InternalParallelWorkers array.
1604 : */
2186 tgl 1605 GIC 1298 : if (strcmp(libraryname, "postgres") == 0)
1606 : {
1607 : int i;
1608 :
1609 1403 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1610 : {
1611 1403 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1612 1298 : return InternalParallelWorkers[i].fn_addr;
1613 : }
1614 :
1615 : /* We can only reach this by programming error. */
2186 tgl 1616 UIC 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1617 : }
1618 :
1619 : /* Otherwise load from external library. */
1620 0 : return (parallel_worker_main_type)
1621 0 : load_external_function(libraryname, funcname, true, NULL);
1622 : }
|