Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel.c
4 : : * Infrastructure for launching parallel workers
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/parallel.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/brin.h"
18 : : #include "access/nbtree.h"
19 : : #include "access/parallel.h"
20 : : #include "access/session.h"
21 : : #include "access/xact.h"
22 : : #include "access/xlog.h"
23 : : #include "catalog/index.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/pg_enum.h"
26 : : #include "catalog/storage.h"
27 : : #include "commands/async.h"
28 : : #include "commands/vacuum.h"
29 : : #include "executor/execParallel.h"
30 : : #include "libpq/libpq.h"
31 : : #include "libpq/pqformat.h"
32 : : #include "libpq/pqmq.h"
33 : : #include "miscadmin.h"
34 : : #include "optimizer/optimizer.h"
35 : : #include "pgstat.h"
36 : : #include "storage/ipc.h"
37 : : #include "storage/predicate.h"
38 : : #include "storage/spin.h"
39 : : #include "tcop/tcopprot.h"
40 : : #include "utils/combocid.h"
41 : : #include "utils/guc.h"
42 : : #include "utils/inval.h"
43 : : #include "utils/memutils.h"
44 : : #include "utils/relmapper.h"
45 : : #include "utils/snapmgr.h"
46 : :
47 : : /*
48 : : * We don't want to waste a lot of memory on an error queue which, most of
49 : : * the time, will process only a handful of small messages. However, it is
50 : : * desirable to make it large enough that a typical ErrorResponse can be sent
51 : : * without blocking. That way, a worker that errors out can write the whole
52 : : * message into the queue and terminate without waiting for the user backend.
53 : : */
54 : : #define PARALLEL_ERROR_QUEUE_SIZE 16384
55 : :
56 : : /* Magic number for parallel context TOC. */
57 : : #define PARALLEL_MAGIC 0x50477c7c
58 : :
59 : : /*
60 : : * Magic numbers for per-context parallel state sharing. Higher-level code
61 : : * should use smaller values, leaving these very large ones for use by this
62 : : * module.
63 : : */
64 : : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
65 : : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
66 : : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
67 : : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
68 : : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
69 : : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
70 : : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
71 : : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
72 : : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
73 : : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
74 : : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
75 : : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
76 : : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
77 : : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
78 : : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
79 : :
80 : : /* Fixed-size parallel state. */
81 : : typedef struct FixedParallelState
82 : : {
83 : : /* Fixed-size state that workers must restore. */
84 : : Oid database_id;
85 : : Oid authenticated_user_id;
86 : : Oid current_user_id;
87 : : Oid outer_user_id;
88 : : Oid temp_namespace_id;
89 : : Oid temp_toast_namespace_id;
90 : : int sec_context;
91 : : bool is_superuser;
92 : : PGPROC *parallel_leader_pgproc;
93 : : pid_t parallel_leader_pid;
94 : : ProcNumber parallel_leader_proc_number;
95 : : TimestampTz xact_ts;
96 : : TimestampTz stmt_ts;
97 : : SerializableXactHandle serializable_xact_handle;
98 : :
99 : : /* Mutex protects remaining fields. */
100 : : slock_t mutex;
101 : :
102 : : /* Maximum XactLastRecEnd of any worker. */
103 : : XLogRecPtr last_xlog_end;
104 : : } FixedParallelState;
105 : :
106 : : /*
107 : : * Our parallel worker number. We initialize this to -1, meaning that we are
108 : : * not a parallel worker. In parallel workers, it will be set to a value >= 0
109 : : * and < the number of workers before any user code is invoked; each parallel
110 : : * worker will get a different parallel worker number.
111 : : */
112 : : int ParallelWorkerNumber = -1;
113 : :
114 : : /* Is there a parallel message pending which we need to receive? */
115 : : volatile sig_atomic_t ParallelMessagePending = false;
116 : :
117 : : /* Are we initializing a parallel worker? */
118 : : bool InitializingParallelWorker = false;
119 : :
120 : : /* Pointer to our fixed parallel state. */
121 : : static FixedParallelState *MyFixedParallelState;
122 : :
123 : : /* List of active parallel contexts. */
124 : : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
125 : :
126 : : /* Backend-local copy of data from FixedParallelState. */
127 : : static pid_t ParallelLeaderPid;
128 : :
129 : : /*
130 : : * List of internal parallel worker entry points. We need this for
131 : : * reasons explained in LookupParallelWorkerFunction(), below.
132 : : */
133 : : static const struct
134 : : {
135 : : const char *fn_name;
136 : : parallel_worker_main_type fn_addr;
137 : : } InternalParallelWorkers[] =
138 : :
139 : : {
140 : : {
141 : : "ParallelQueryMain", ParallelQueryMain
142 : : },
143 : : {
144 : : "_bt_parallel_build_main", _bt_parallel_build_main
145 : : },
146 : : {
147 : : "_brin_parallel_build_main", _brin_parallel_build_main
148 : : },
149 : : {
150 : : "parallel_vacuum_main", parallel_vacuum_main
151 : : }
152 : : };
153 : :
154 : : /* Private functions. */
155 : : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
156 : : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
157 : : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
158 : : static void ParallelWorkerShutdown(int code, Datum arg);
159 : :
160 : :
161 : : /*
162 : : * Establish a new parallel context. This should be done after entering
163 : : * parallel mode, and (unless there is an error) the context should be
164 : : * destroyed before exiting the current subtransaction.
165 : : */
166 : : ParallelContext *
2557 tgl@sss.pgh.pa.us 167 :CBC 414 : CreateParallelContext(const char *library_name, const char *function_name,
168 : : int nworkers)
169 : : {
170 : : MemoryContext oldcontext;
171 : : ParallelContext *pcxt;
172 : :
173 : : /* It is unsafe to create a parallel context if not in parallel mode. */
3272 rhaas@postgresql.org 174 [ - + ]: 414 : Assert(IsInParallelMode());
175 : :
176 : : /* Number of workers should be non-negative. */
177 [ - + ]: 414 : Assert(nworkers >= 0);
178 : :
179 : : /* We might be running in a short-lived memory context. */
180 : 414 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
181 : :
182 : : /* Initialize a new ParallelContext. */
183 : 414 : pcxt = palloc0(sizeof(ParallelContext));
184 : 414 : pcxt->subid = GetCurrentSubTransactionId();
185 : 414 : pcxt->nworkers = nworkers;
1546 akapila@postgresql.o 186 : 414 : pcxt->nworkers_to_launch = nworkers;
2557 tgl@sss.pgh.pa.us 187 : 414 : pcxt->library_name = pstrdup(library_name);
188 : 414 : pcxt->function_name = pstrdup(function_name);
3272 rhaas@postgresql.org 189 : 414 : pcxt->error_context_stack = error_context_stack;
190 : 414 : shm_toc_initialize_estimator(&pcxt->estimator);
191 : 414 : dlist_push_head(&pcxt_list, &pcxt->node);
192 : :
193 : : /* Restore previous memory context. */
194 : 414 : MemoryContextSwitchTo(oldcontext);
195 : :
196 : 414 : return pcxt;
197 : : }
198 : :
199 : : /*
200 : : * Establish the dynamic shared memory segment for a parallel context and
201 : : * copy state and other bookkeeping information that will be needed by
202 : : * parallel workers into it.
203 : : */
204 : : void
205 : 414 : InitializeParallelDSM(ParallelContext *pcxt)
206 : : {
207 : : MemoryContext oldcontext;
3249 bruce@momjian.us 208 : 414 : Size library_len = 0;
209 : 414 : Size guc_len = 0;
210 : 414 : Size combocidlen = 0;
211 : 414 : Size tsnaplen = 0;
212 : 414 : Size asnaplen = 0;
213 : 414 : Size tstatelen = 0;
1471 noah@leadboat.com 214 : 414 : Size pendingsyncslen = 0;
2277 rhaas@postgresql.org 215 : 414 : Size reindexlen = 0;
2074 pg@bowt.ie 216 : 414 : Size relmapperlen = 0;
1195 tmunro@postgresql.or 217 : 414 : Size uncommittedenumslen = 0;
599 michael@paquier.xyz 218 : 414 : Size clientconninfolen = 0;
3249 bruce@momjian.us 219 : 414 : Size segsize = 0;
220 : : int i;
221 : : FixedParallelState *fps;
2404 andres@anarazel.de 222 : 414 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
3272 rhaas@postgresql.org 223 : 414 : Snapshot transaction_snapshot = GetTransactionSnapshot();
224 : 414 : Snapshot active_snapshot = GetActiveSnapshot();
225 : :
226 : : /* We might be running in a very short-lived memory context. */
227 : 414 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
228 : :
229 : : /* Allow space to store the fixed-size parallel state. */
230 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
231 : 414 : shm_toc_estimate_keys(&pcxt->estimator, 1);
232 : :
233 : : /*
234 : : * Normally, the user will have requested at least one worker process, but
235 : : * if by chance they have not, we can skip a bunch of things here.
236 : : */
2404 andres@anarazel.de 237 [ + - ]: 414 : if (pcxt->nworkers > 0)
238 : : {
239 : : /* Get (or create) the per-session DSM segment's handle. */
240 : 414 : session_dsm_handle = GetSessionDsmHandle();
241 : :
242 : : /*
243 : : * If we weren't able to create a per-session DSM segment, then we can
244 : : * continue but we can't safely launch any workers because their
245 : : * record typmods would be incompatible so they couldn't exchange
246 : : * tuples.
247 : : */
248 [ - + ]: 414 : if (session_dsm_handle == DSM_HANDLE_INVALID)
2404 andres@anarazel.de 249 :UBC 0 : pcxt->nworkers = 0;
250 : : }
251 : :
3272 rhaas@postgresql.org 252 [ + - ]:CBC 414 : if (pcxt->nworkers > 0)
253 : : {
254 : : /* Estimate space for various kinds of state sharing. */
255 : 414 : library_len = EstimateLibraryStateSpace();
256 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
257 : 414 : guc_len = EstimateGUCStateSpace();
258 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
259 : 414 : combocidlen = EstimateComboCIDStateSpace();
260 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
963 261 [ + + ]: 414 : if (IsolationUsesXactSnapshot())
262 : : {
263 : 11 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
264 : 11 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
265 : : }
3272 266 : 414 : asnaplen = EstimateSnapshotSpace(active_snapshot);
267 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
268 : 414 : tstatelen = EstimateTransactionStateSpace();
269 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
2404 andres@anarazel.de 270 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
1471 noah@leadboat.com 271 : 414 : pendingsyncslen = EstimatePendingSyncsSpace();
272 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
2277 rhaas@postgresql.org 273 : 414 : reindexlen = EstimateReindexStateSpace();
274 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
2074 pg@bowt.ie 275 : 414 : relmapperlen = EstimateRelationMapSpace();
276 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
1195 tmunro@postgresql.or 277 : 414 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
278 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
599 michael@paquier.xyz 279 : 414 : clientconninfolen = EstimateClientConnectionInfoSpace();
280 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
281 : : /* If you add more chunks here, you probably need to add keys. */
282 : 414 : shm_toc_estimate_keys(&pcxt->estimator, 12);
283 : :
284 : : /* Estimate space need for error queues. */
285 : : StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
286 : : PARALLEL_ERROR_QUEUE_SIZE,
287 : : "parallel error queue size not buffer-aligned");
3272 rhaas@postgresql.org 288 : 414 : shm_toc_estimate_chunk(&pcxt->estimator,
289 : : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
290 : : pcxt->nworkers));
291 : 414 : shm_toc_estimate_keys(&pcxt->estimator, 1);
292 : :
293 : : /* Estimate how much we'll need for the entrypoint info. */
2557 tgl@sss.pgh.pa.us 294 : 414 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
295 : : strlen(pcxt->function_name) + 2);
296 : 414 : shm_toc_estimate_keys(&pcxt->estimator, 1);
297 : : }
298 : :
299 : : /*
300 : : * Create DSM and initialize with new table of contents. But if the user
301 : : * didn't request any workers, then don't bother creating a dynamic shared
302 : : * memory segment; instead, just use backend-private memory.
303 : : *
304 : : * Also, if we can't create a dynamic shared memory segment because the
305 : : * maximum number of segments have already been created, then fall back to
306 : : * backend-private memory, and plan not to use any workers. We hope this
307 : : * won't happen very often, but it's better to abandon the use of
308 : : * parallelism than to fail outright.
309 : : */
3272 rhaas@postgresql.org 310 : 414 : segsize = shm_toc_estimate(&pcxt->estimator);
2859 tgl@sss.pgh.pa.us 311 [ + - ]: 414 : if (pcxt->nworkers > 0)
3272 rhaas@postgresql.org 312 : 414 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
313 [ + - ]: 414 : if (pcxt->seg != NULL)
314 : 414 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
315 : : dsm_segment_address(pcxt->seg),
316 : : segsize);
317 : : else
318 : : {
3272 rhaas@postgresql.org 319 :UBC 0 : pcxt->nworkers = 0;
3267 320 : 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
321 : 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
322 : : segsize);
323 : : }
324 : :
325 : : /* Initialize fixed-size state in shared memory. */
326 : : fps = (FixedParallelState *)
3272 rhaas@postgresql.org 327 :CBC 414 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
328 : 414 : fps->database_id = MyDatabaseId;
329 : 414 : fps->authenticated_user_id = GetAuthenticatedUserId();
2359 330 : 414 : fps->outer_user_id = GetCurrentRoleId();
277 nathan@postgresql.or 331 :GNC 414 : fps->is_superuser = current_role_is_superuser;
3272 rhaas@postgresql.org 332 :CBC 414 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
2866 tgl@sss.pgh.pa.us 333 : 414 : GetTempNamespaceState(&fps->temp_namespace_id,
334 : : &fps->temp_toast_namespace_id);
1400 andres@anarazel.de 335 : 414 : fps->parallel_leader_pgproc = MyProc;
336 : 414 : fps->parallel_leader_pid = MyProcPid;
42 heikki.linnakangas@i 337 :GNC 414 : fps->parallel_leader_proc_number = MyProcNumber;
2017 tgl@sss.pgh.pa.us 338 :CBC 414 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
339 : 414 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
1857 tmunro@postgresql.or 340 : 414 : fps->serializable_xact_handle = ShareSerializableXact();
3272 rhaas@postgresql.org 341 : 414 : SpinLockInit(&fps->mutex);
342 : 414 : fps->last_xlog_end = 0;
343 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
344 : :
345 : : /* We can skip the rest of this if we're not budgeting for any workers. */
346 [ + - ]: 414 : if (pcxt->nworkers > 0)
347 : : {
348 : : char *libraryspace;
349 : : char *gucspace;
350 : : char *combocidspace;
351 : : char *tsnapspace;
352 : : char *asnapspace;
353 : : char *tstatespace;
354 : : char *pendingsyncsspace;
355 : : char *reindexspace;
356 : : char *relmapperspace;
357 : : char *error_queue_space;
358 : : char *session_dsm_handle_space;
359 : : char *entrypointstate;
360 : : char *uncommittedenumsspace;
361 : : char *clientconninfospace;
362 : : Size lnamelen;
363 : :
364 : : /* Serialize shared libraries we have loaded. */
365 : 414 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
366 : 414 : SerializeLibraryState(library_len, libraryspace);
367 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
368 : :
369 : : /* Serialize GUC settings. */
370 : 414 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
371 : 414 : SerializeGUCState(guc_len, gucspace);
372 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
373 : :
374 : : /* Serialize combo CID state. */
375 : 414 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
376 : 414 : SerializeComboCIDState(combocidlen, combocidspace);
377 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
378 : :
379 : : /*
380 : : * Serialize the transaction snapshot if the transaction isolation
381 : : * level uses a transaction snapshot.
382 : : */
963 383 [ + + ]: 414 : if (IsolationUsesXactSnapshot())
384 : : {
385 : 11 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
386 : 11 : SerializeSnapshot(transaction_snapshot, tsnapspace);
387 : 11 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
388 : : tsnapspace);
389 : : }
390 : :
391 : : /* Serialize the active snapshot. */
3272 392 : 414 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
393 : 414 : SerializeSnapshot(active_snapshot, asnapspace);
394 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
395 : :
396 : : /* Provide the handle for per-session segment. */
2404 andres@anarazel.de 397 : 414 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
398 : : sizeof(dsm_handle));
399 : 414 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
400 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
401 : : session_dsm_handle_space);
402 : :
403 : : /* Serialize transaction state. */
3272 rhaas@postgresql.org 404 : 414 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
405 : 414 : SerializeTransactionState(tstatelen, tstatespace);
406 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
407 : :
408 : : /* Serialize pending syncs. */
1471 noah@leadboat.com 409 : 414 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
410 : 414 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
411 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
412 : : pendingsyncsspace);
413 : :
414 : : /* Serialize reindex state. */
2277 rhaas@postgresql.org 415 : 414 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
416 : 414 : SerializeReindexState(reindexlen, reindexspace);
417 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
418 : :
419 : : /* Serialize relmapper state. */
2074 pg@bowt.ie 420 : 414 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
421 : 414 : SerializeRelationMap(relmapperlen, relmapperspace);
422 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
423 : : relmapperspace);
424 : :
425 : : /* Serialize uncommitted enum state. */
1195 tmunro@postgresql.or 426 : 414 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
427 : : uncommittedenumslen);
428 : 414 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
429 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
430 : : uncommittedenumsspace);
431 : :
432 : : /* Serialize our ClientConnectionInfo. */
599 michael@paquier.xyz 433 : 414 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
434 : 414 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
435 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
436 : : clientconninfospace);
437 : :
438 : : /* Allocate space for worker information. */
3272 rhaas@postgresql.org 439 : 414 : pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
440 : :
441 : : /*
442 : : * Establish error queues in dynamic shared memory.
443 : : *
444 : : * These queues should be used only for transmitting ErrorResponse,
445 : : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
446 : : * should be transmitted via separate (possibly larger?) queues.
447 : : */
448 : : error_queue_space =
3249 bruce@momjian.us 449 : 414 : shm_toc_allocate(pcxt->toc,
450 : : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
2900 rhaas@postgresql.org 451 : 414 : pcxt->nworkers));
3272 452 [ + + ]: 1364 : for (i = 0; i < pcxt->nworkers; ++i)
453 : : {
454 : : char *start;
455 : : shm_mq *mq;
456 : :
457 : 950 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
458 : 950 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
459 : 950 : shm_mq_set_receiver(mq, MyProc);
460 : 950 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
461 : : }
462 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
463 : :
464 : : /*
465 : : * Serialize entrypoint information. It's unsafe to pass function
466 : : * pointers across processes, as the function pointer may be different
467 : : * in each process in EXEC_BACKEND builds, so we always pass library
468 : : * and function name. (We use library name "postgres" for functions
469 : : * in the core backend.)
470 : : */
2557 tgl@sss.pgh.pa.us 471 : 414 : lnamelen = strlen(pcxt->library_name);
472 : 414 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
473 : 414 : strlen(pcxt->function_name) + 2);
474 : 414 : strcpy(entrypointstate, pcxt->library_name);
475 : 414 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
476 : 414 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
477 : : }
478 : :
479 : : /* Restore previous memory context. */
3272 rhaas@postgresql.org 480 : 414 : MemoryContextSwitchTo(oldcontext);
481 : 414 : }
482 : :
483 : : /*
484 : : * Reinitialize the dynamic shared memory segment for a parallel context such
485 : : * that we could launch workers for it again.
486 : : */
487 : : void
3089 488 : 130 : ReinitializeParallelDSM(ParallelContext *pcxt)
489 : : {
490 : : FixedParallelState *fps;
491 : :
492 : : /* Wait for any old workers to exit. */
2859 tgl@sss.pgh.pa.us 493 [ + - ]: 130 : if (pcxt->nworkers_launched > 0)
494 : : {
495 : 130 : WaitForParallelWorkersToFinish(pcxt);
496 : 130 : WaitForParallelWorkersToExit(pcxt);
497 : 130 : pcxt->nworkers_launched = 0;
2263 rhaas@postgresql.org 498 [ + - ]: 130 : if (pcxt->known_attached_workers)
499 : : {
500 : 130 : pfree(pcxt->known_attached_workers);
501 : 130 : pcxt->known_attached_workers = NULL;
502 : 130 : pcxt->nknown_attached_workers = 0;
503 : : }
504 : : }
505 : :
506 : : /* Reset a few bits of fixed parallel state to a clean state. */
2505 tgl@sss.pgh.pa.us 507 : 130 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
3089 rhaas@postgresql.org 508 : 130 : fps->last_xlog_end = 0;
509 : :
510 : : /* Recreate error queues (if they exist). */
2263 tgl@sss.pgh.pa.us 511 [ + - ]: 130 : if (pcxt->nworkers > 0)
512 : : {
513 : : char *error_queue_space;
514 : : int i;
515 : :
516 : : error_queue_space =
517 : 130 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
518 [ + + ]: 543 : for (i = 0; i < pcxt->nworkers; ++i)
519 : : {
520 : : char *start;
521 : : shm_mq *mq;
522 : :
523 : 413 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
524 : 413 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
525 : 413 : shm_mq_set_receiver(mq, MyProc);
526 : 413 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
527 : : }
528 : : }
3089 rhaas@postgresql.org 529 : 130 : }
530 : :
531 : : /*
532 : : * Reinitialize parallel workers for a parallel context such that we could
533 : : * launch a different number of workers. This is required for cases where
534 : : * we need to reuse the same DSM segment, but the number of workers can
535 : : * vary from run-to-run.
536 : : */
537 : : void
1546 akapila@postgresql.o 538 : 10 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
539 : : {
540 : : /*
541 : : * The number of workers that need to be launched must be less than the
542 : : * number of workers with which the parallel context is initialized.
543 : : */
544 [ - + ]: 10 : Assert(pcxt->nworkers >= nworkers_to_launch);
545 : 10 : pcxt->nworkers_to_launch = nworkers_to_launch;
546 : 10 : }
547 : :
548 : : /*
549 : : * Launch parallel workers.
550 : : */
551 : : void
3272 rhaas@postgresql.org 552 : 544 : LaunchParallelWorkers(ParallelContext *pcxt)
553 : : {
554 : : MemoryContext oldcontext;
555 : : BackgroundWorker worker;
556 : : int i;
3249 bruce@momjian.us 557 : 544 : bool any_registrations_failed = false;
558 : :
559 : : /* Skip this if we have no workers. */
1546 akapila@postgresql.o 560 [ + - - + ]: 544 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
3272 rhaas@postgresql.org 561 :UBC 0 : return;
562 : :
563 : : /* We need to be a lock group leader. */
2989 rhaas@postgresql.org 564 :CBC 544 : BecomeLockGroupLeader();
565 : :
566 : : /* If we do have workers, we'd better have a DSM segment. */
3272 567 [ - + ]: 544 : Assert(pcxt->seg != NULL);
568 : :
569 : : /* We might be running in a short-lived memory context. */
570 : 544 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
571 : :
572 : : /* Configure a worker. */
2555 tgl@sss.pgh.pa.us 573 : 544 : memset(&worker, 0, sizeof(worker));
3272 rhaas@postgresql.org 574 : 544 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
575 : : MyProcPid);
2418 peter_e@gmx.net 576 : 544 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
3272 rhaas@postgresql.org 577 : 544 : worker.bgw_flags =
578 : : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
579 : : | BGWORKER_CLASS_PARALLEL;
580 : 544 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
581 : 544 : worker.bgw_restart_time = BGW_NEVER_RESTART;
2571 582 : 544 : sprintf(worker.bgw_library_name, "postgres");
583 : 544 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
3272 584 : 544 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
585 : 544 : worker.bgw_notify_pid = MyProcPid;
586 : :
587 : : /*
588 : : * Start workers.
589 : : *
590 : : * The caller must be able to tolerate ending up with fewer workers than
591 : : * expected, so there is no need to throw an error here if registration
592 : : * fails. It wouldn't help much anyway, because registering the worker in
593 : : * no way guarantees that it will start up and initialize successfully.
594 : : */
1546 akapila@postgresql.o 595 [ + + ]: 1906 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
596 : : {
3083 rhaas@postgresql.org 597 : 1362 : memcpy(worker.bgw_extra, &i, sizeof(int));
3272 598 [ + + + + ]: 2695 : if (!any_registrations_failed &&
599 : 1333 : RegisterDynamicBackgroundWorker(&worker,
600 : 1333 : &pcxt->worker[i].bgwhandle))
601 : : {
602 : 1322 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
603 : 1322 : pcxt->worker[i].bgwhandle);
3103 604 : 1322 : pcxt->nworkers_launched++;
605 : : }
606 : : else
607 : : {
608 : : /*
609 : : * If we weren't able to register the worker, then we've bumped up
610 : : * against the max_worker_processes limit, and future
611 : : * registrations will probably fail too, so arrange to skip them.
612 : : * But we still have to execute this code for the remaining slots
613 : : * to make sure that we forget about the error queues we budgeted
614 : : * for those workers. Otherwise, we'll wait for them to start,
615 : : * but they never will.
616 : : */
3272 617 : 40 : any_registrations_failed = true;
618 : 40 : pcxt->worker[i].bgwhandle = NULL;
2418 tgl@sss.pgh.pa.us 619 : 40 : shm_mq_detach(pcxt->worker[i].error_mqh);
3272 rhaas@postgresql.org 620 : 40 : pcxt->worker[i].error_mqh = NULL;
621 : : }
622 : : }
623 : :
624 : : /*
625 : : * Now that nworkers_launched has taken its final value, we can initialize
626 : : * known_attached_workers.
627 : : */
2273 628 [ + + ]: 544 : if (pcxt->nworkers_launched > 0)
629 : : {
2263 630 : 534 : pcxt->known_attached_workers =
2273 631 : 534 : palloc0(sizeof(bool) * pcxt->nworkers_launched);
2263 632 : 534 : pcxt->nknown_attached_workers = 0;
633 : : }
634 : :
635 : : /* Restore previous memory context. */
3272 636 : 544 : MemoryContextSwitchTo(oldcontext);
637 : : }
638 : :
639 : : /*
640 : : * Wait for all workers to attach to their error queues, and throw an error if
641 : : * any worker fails to do this.
642 : : *
643 : : * Callers can assume that if this function returns successfully, then the
644 : : * number of workers given by pcxt->nworkers_launched have initialized and
645 : : * attached to their error queues. Whether or not these workers are guaranteed
646 : : * to still be running depends on what code the caller asked them to run;
647 : : * this function does not guarantee that they have not exited. However, it
648 : : * does guarantee that any workers which exited must have done so cleanly and
649 : : * after successfully performing the work with which they were tasked.
650 : : *
651 : : * If this function is not called, then some of the workers that were launched
652 : : * may not have been started due to a fork() failure, or may have exited during
653 : : * early startup prior to attaching to the error queue, so nworkers_launched
654 : : * cannot be viewed as completely reliable. It will never be less than the
655 : : * number of workers which actually started, but it might be more. Any workers
656 : : * that failed to start will still be discovered by
657 : : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
658 : : * provided that function is eventually reached.
659 : : *
660 : : * In general, the leader process should do as much work as possible before
661 : : * calling this function. fork() failures and other early-startup failures
662 : : * are very uncommon, and having the leader sit idle when it could be doing
663 : : * useful work is undesirable. However, if the leader needs to wait for
664 : : * all of its workers or for a specific worker, it may want to call this
665 : : * function before doing so. If not, it must make some other provision for
666 : : * the failure-to-start case, lest it wait forever. On the other hand, a
667 : : * leader which never waits for a worker that might not be started yet, or
668 : : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
669 : : * call this function at all.
670 : : */
671 : : void
2263 672 : 72 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
673 : : {
674 : : int i;
675 : :
676 : : /* Skip this if we have no launched workers. */
677 [ - + ]: 72 : if (pcxt->nworkers_launched == 0)
2263 rhaas@postgresql.org 678 :UBC 0 : return;
679 : :
680 : : for (;;)
681 : : {
682 : : /*
683 : : * This will process any parallel messages that are pending and it may
684 : : * also throw an error propagated from a worker.
685 : : */
2263 rhaas@postgresql.org 686 [ + + ]:CBC 324899 : CHECK_FOR_INTERRUPTS();
687 : :
688 [ + + ]: 649800 : for (i = 0; i < pcxt->nworkers_launched; ++i)
689 : : {
690 : : BgwHandleStatus status;
691 : : shm_mq *mq;
692 : : int rc;
693 : : pid_t pid;
694 : :
695 [ + + ]: 324901 : if (pcxt->known_attached_workers[i])
696 : 15 : continue;
697 : :
698 : : /*
699 : : * If error_mqh is NULL, then the worker has already exited
700 : : * cleanly.
701 : : */
702 [ - + ]: 324886 : if (pcxt->worker[i].error_mqh == NULL)
703 : : {
2263 rhaas@postgresql.org 704 :UBC 0 : pcxt->known_attached_workers[i] = true;
705 : 0 : ++pcxt->nknown_attached_workers;
706 : 0 : continue;
707 : : }
708 : :
2263 rhaas@postgresql.org 709 :CBC 324886 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
710 [ + + ]: 324886 : if (status == BGWH_STARTED)
711 : : {
712 : : /* Has the worker attached to the error queue? */
713 : 324845 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
714 [ + + ]: 324845 : if (shm_mq_get_sender(mq) != NULL)
715 : : {
716 : : /* Yes, so it is known to be attached. */
717 : 59 : pcxt->known_attached_workers[i] = true;
718 : 59 : ++pcxt->nknown_attached_workers;
719 : : }
720 : : }
721 [ - + ]: 41 : else if (status == BGWH_STOPPED)
722 : : {
723 : : /*
724 : : * If the worker stopped without attaching to the error queue,
725 : : * throw an error.
726 : : */
2263 rhaas@postgresql.org 727 :UBC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
728 [ # # ]: 0 : if (shm_mq_get_sender(mq) == NULL)
729 [ # # ]: 0 : ereport(ERROR,
730 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
731 : : errmsg("parallel worker failed to initialize"),
732 : : errhint("More details may be available in the server log.")));
733 : :
734 : 0 : pcxt->known_attached_workers[i] = true;
735 : 0 : ++pcxt->nknown_attached_workers;
736 : : }
737 : : else
738 : : {
739 : : /*
740 : : * Worker not yet started, so we must wait. The postmaster
741 : : * will notify us if the worker's state changes. Our latch
742 : : * might also get set for some other reason, but if so we'll
743 : : * just end up waiting for the same worker again.
744 : : */
2263 rhaas@postgresql.org 745 :CBC 41 : rc = WaitLatch(MyLatch,
746 : : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
747 : : -1, WAIT_EVENT_BGWORKER_STARTUP);
748 : :
749 [ + - ]: 41 : if (rc & WL_LATCH_SET)
750 : 41 : ResetLatch(MyLatch);
751 : : }
752 : : }
753 : :
754 : : /* If all workers are known to have started, we're done. */
755 [ + + ]: 324899 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
756 : : {
757 [ - + ]: 72 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
758 : 72 : break;
759 : : }
760 : : }
761 : : }
762 : :
763 : : /*
764 : : * Wait for all workers to finish computing.
765 : : *
766 : : * Even if the parallel operation seems to have completed successfully, it's
767 : : * important to call this function afterwards. We must not miss any errors
768 : : * the workers may have thrown during the parallel operation, or any that they
769 : : * may yet throw while shutting down.
770 : : *
771 : : * Also, we want to update our notion of XactLastRecEnd based on worker
772 : : * feedback.
773 : : */
774 : : void
3272 775 : 671 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
776 : : {
777 : : for (;;)
778 : 302 : {
3249 bruce@momjian.us 779 : 973 : bool anyone_alive = false;
2273 rhaas@postgresql.org 780 : 973 : int nfinished = 0;
781 : : int i;
782 : :
783 : : /*
784 : : * This will process any parallel messages that are pending, which may
785 : : * change the outcome of the loop that follows. It may also throw an
786 : : * error propagated from a worker.
787 : : */
3272 788 [ + + ]: 973 : CHECK_FOR_INTERRUPTS();
789 : :
2963 790 [ + + ]: 3518 : for (i = 0; i < pcxt->nworkers_launched; ++i)
791 : : {
792 : : /*
793 : : * If error_mqh is NULL, then the worker has already exited
794 : : * cleanly. If we have received a message through error_mqh from
795 : : * the worker, we know it started up cleanly, and therefore we're
796 : : * certain to be notified when it exits.
797 : : */
2273 798 [ + + ]: 2550 : if (pcxt->worker[i].error_mqh == NULL)
799 : 2103 : ++nfinished;
2263 800 [ + + ]: 447 : else if (pcxt->known_attached_workers[i])
801 : : {
3272 802 : 5 : anyone_alive = true;
803 : 5 : break;
804 : : }
805 : : }
806 : :
807 [ + + ]: 973 : if (!anyone_alive)
808 : : {
809 : : /* If all workers are known to have finished, we're done. */
2273 810 [ + + ]: 968 : if (nfinished >= pcxt->nworkers_launched)
811 : : {
812 [ - + ]: 671 : Assert(nfinished == pcxt->nworkers_launched);
813 : 671 : break;
814 : : }
815 : :
816 : : /*
817 : : * We didn't detect any living workers, but not all workers are
818 : : * known to have exited cleanly. Either not all workers have
819 : : * launched yet, or maybe some of them failed to start or
820 : : * terminated abnormally.
821 : : */
822 [ + + ]: 1110 : for (i = 0; i < pcxt->nworkers_launched; ++i)
823 : : {
824 : : pid_t pid;
825 : : shm_mq *mq;
826 : :
827 : : /*
828 : : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
829 : : * should just keep waiting. If it is BGWH_STOPPED, then
830 : : * further investigation is needed.
831 : : */
832 [ + + ]: 813 : if (pcxt->worker[i].error_mqh == NULL ||
833 [ + - + - ]: 884 : pcxt->worker[i].bgwhandle == NULL ||
834 : 442 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
835 : : &pid) != BGWH_STOPPED)
836 : 813 : continue;
837 : :
838 : : /*
839 : : * Check whether the worker ended up stopped without ever
840 : : * attaching to the error queue. If so, the postmaster was
841 : : * unable to fork the worker or it exited without initializing
842 : : * properly. We must throw an error, since the caller may
843 : : * have been expecting the worker to do some work before
844 : : * exiting.
845 : : */
2273 rhaas@postgresql.org 846 :UBC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
847 [ # # ]: 0 : if (shm_mq_get_sender(mq) == NULL)
848 [ # # ]: 0 : ereport(ERROR,
849 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
850 : : errmsg("parallel worker failed to initialize"),
851 : : errhint("More details may be available in the server log.")));
852 : :
853 : : /*
854 : : * The worker is stopped, but is attached to the error queue.
855 : : * Unless there's a bug somewhere, this will only happen when
856 : : * the worker writes messages and terminates after the
857 : : * CHECK_FOR_INTERRUPTS() near the top of this function and
858 : : * before the call to GetBackgroundWorkerPid(). In that case,
859 : : * or latch should have been set as well and the right things
860 : : * will happen on the next pass through the loop.
861 : : */
862 : : }
863 : : }
864 : :
1969 tmunro@postgresql.or 865 :CBC 302 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
866 : : WAIT_EVENT_PARALLEL_FINISH);
2504 andres@anarazel.de 867 : 302 : ResetLatch(MyLatch);
868 : : }
869 : :
3272 rhaas@postgresql.org 870 [ + - ]: 671 : if (pcxt->toc != NULL)
871 : : {
872 : : FixedParallelState *fps;
873 : :
2505 tgl@sss.pgh.pa.us 874 : 671 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
3272 rhaas@postgresql.org 875 [ + + ]: 671 : if (fps->last_xlog_end > XactLastRecEnd)
876 : 14 : XactLastRecEnd = fps->last_xlog_end;
877 : : }
878 : 671 : }
879 : :
880 : : /*
881 : : * Wait for all workers to exit.
882 : : *
883 : : * This function ensures that workers have been completely shutdown. The
884 : : * difference between WaitForParallelWorkersToFinish and this function is
885 : : * that the former just ensures that last message sent by a worker backend is
886 : : * received by the leader backend whereas this ensures the complete shutdown.
887 : : */
888 : : static void
3089 889 : 544 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
890 : : {
891 : : int i;
892 : :
893 : : /* Wait until the workers actually die. */
2963 894 [ + + ]: 1866 : for (i = 0; i < pcxt->nworkers_launched; ++i)
895 : : {
896 : : BgwHandleStatus status;
897 : :
3089 898 [ + - - + ]: 1322 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
3089 rhaas@postgresql.org 899 :UBC 0 : continue;
900 : :
3089 rhaas@postgresql.org 901 :CBC 1322 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
902 : :
903 : : /*
904 : : * If the postmaster kicked the bucket, we have no chance of cleaning
905 : : * up safely -- we won't be able to tell when our workers are actually
906 : : * dead. This doesn't necessitate a PANIC since they will all abort
907 : : * eventually, but we can't safely continue this session.
908 : : */
909 [ - + ]: 1322 : if (status == BGWH_POSTMASTER_DIED)
3089 rhaas@postgresql.org 910 [ # # ]:UBC 0 : ereport(FATAL,
911 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
912 : : errmsg("postmaster exited during a parallel transaction")));
913 : :
914 : : /* Release memory. */
3089 rhaas@postgresql.org 915 :CBC 1322 : pfree(pcxt->worker[i].bgwhandle);
916 : 1322 : pcxt->worker[i].bgwhandle = NULL;
917 : : }
918 : 544 : }
919 : :
920 : : /*
921 : : * Destroy a parallel context.
922 : : *
923 : : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
924 : : * first, before calling this function. When this function is invoked, any
925 : : * remaining workers are forcibly killed; the dynamic shared memory segment
926 : : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
927 : : */
928 : : void
3272 929 : 414 : DestroyParallelContext(ParallelContext *pcxt)
930 : : {
931 : : int i;
932 : :
933 : : /*
934 : : * Be careful about order of operations here! We remove the parallel
935 : : * context from the list before we do anything else; otherwise, if an
936 : : * error occurs during a subsequent step, we might try to nuke it again
937 : : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
938 : : */
939 : 414 : dlist_delete(&pcxt->node);
940 : :
941 : : /* Kill each worker in turn, and forget their error queues. */
3119 942 [ + - ]: 414 : if (pcxt->worker != NULL)
943 : : {
2963 944 [ + + ]: 1323 : for (i = 0; i < pcxt->nworkers_launched; ++i)
945 : : {
3119 946 [ + + ]: 909 : if (pcxt->worker[i].error_mqh != NULL)
947 : : {
3089 948 : 3 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
949 : :
2418 tgl@sss.pgh.pa.us 950 : 3 : shm_mq_detach(pcxt->worker[i].error_mqh);
3119 rhaas@postgresql.org 951 : 3 : pcxt->worker[i].error_mqh = NULL;
952 : : }
953 : : }
954 : : }
955 : :
956 : : /*
957 : : * If we have allocated a shared memory segment, detach it. This will
958 : : * implicitly detach the error queues, and any other shared memory queues,
959 : : * stored there.
960 : : */
3272 961 [ + - ]: 414 : if (pcxt->seg != NULL)
962 : : {
963 : 414 : dsm_detach(pcxt->seg);
964 : 414 : pcxt->seg = NULL;
965 : : }
966 : :
967 : : /*
968 : : * If this parallel context is actually in backend-private memory rather
969 : : * than shared memory, free that memory instead.
970 : : */
3267 971 [ - + ]: 414 : if (pcxt->private_memory != NULL)
972 : : {
3267 rhaas@postgresql.org 973 :UBC 0 : pfree(pcxt->private_memory);
974 : 0 : pcxt->private_memory = NULL;
975 : : }
976 : :
977 : : /*
978 : : * We can't finish transaction commit or abort until all of the workers
979 : : * have exited. This means, in particular, that we can't respond to
980 : : * interrupts at this stage.
981 : : */
3089 rhaas@postgresql.org 982 :CBC 414 : HOLD_INTERRUPTS();
983 : 414 : WaitForParallelWorkersToExit(pcxt);
984 [ - + ]: 414 : RESUME_INTERRUPTS();
985 : :
986 : : /* Free the worker array itself. */
3272 987 [ + - ]: 414 : if (pcxt->worker != NULL)
988 : : {
989 : 414 : pfree(pcxt->worker);
990 : 414 : pcxt->worker = NULL;
991 : : }
992 : :
993 : : /* Free memory. */
2557 tgl@sss.pgh.pa.us 994 : 414 : pfree(pcxt->library_name);
995 : 414 : pfree(pcxt->function_name);
3272 rhaas@postgresql.org 996 : 414 : pfree(pcxt);
997 : 414 : }
998 : :
999 : : /*
1000 : : * Are there any parallel contexts currently active?
1001 : : */
1002 : : bool
1003 : 429210 : ParallelContextActive(void)
1004 : : {
1005 : 429210 : return !dlist_is_empty(&pcxt_list);
1006 : : }
1007 : :
1008 : : /*
1009 : : * Handle receipt of an interrupt indicating a parallel worker message.
1010 : : *
1011 : : * Note: this is called within a signal handler! All we can do is set
1012 : : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1013 : : * HandleParallelMessages().
1014 : : */
1015 : : void
1016 : 1607 : HandleParallelMessageInterrupt(void)
1017 : : {
1018 : 1607 : InterruptPending = true;
1019 : 1607 : ParallelMessagePending = true;
1020 : 1607 : SetLatch(MyLatch);
1021 : 1607 : }
1022 : :
1023 : : /*
1024 : : * Handle any queued protocol messages received from parallel workers.
1025 : : */
1026 : : void
1027 : 1294 : HandleParallelMessages(void)
1028 : : {
1029 : : dlist_iter iter;
1030 : : MemoryContext oldcontext;
1031 : :
1032 : : static MemoryContext hpm_context = NULL;
1033 : :
1034 : : /*
1035 : : * This is invoked from ProcessInterrupts(), and since some of the
1036 : : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1037 : : * for recursive calls if more signals are received while this runs. It's
1038 : : * unclear that recursive entry would be safe, and it doesn't seem useful
1039 : : * even if it is safe, so let's block interrupts until done.
1040 : : */
2812 tgl@sss.pgh.pa.us 1041 : 1294 : HOLD_INTERRUPTS();
1042 : :
1043 : : /*
1044 : : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1045 : : * don't want to risk leaking data into long-lived contexts, so let's do
1046 : : * our work here in a private context that we can reset on each use.
1047 : : */
2788 1048 [ + + ]: 1294 : if (hpm_context == NULL) /* first time through? */
1049 : 59 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1050 : : "HandleParallelMessages",
1051 : : ALLOCSET_DEFAULT_SIZES);
1052 : : else
1053 : 1235 : MemoryContextReset(hpm_context);
1054 : :
1055 : 1294 : oldcontext = MemoryContextSwitchTo(hpm_context);
1056 : :
1057 : : /* OK to process messages. Reset the flag saying there are more to do. */
3272 rhaas@postgresql.org 1058 : 1294 : ParallelMessagePending = false;
1059 : :
1060 [ + - + + ]: 2601 : dlist_foreach(iter, &pcxt_list)
1061 : : {
1062 : : ParallelContext *pcxt;
1063 : : int i;
1064 : :
1065 : 1310 : pcxt = dlist_container(ParallelContext, node, iter.cur);
1066 [ - + ]: 1310 : if (pcxt->worker == NULL)
3272 rhaas@postgresql.org 1067 :UBC 0 : continue;
1068 : :
2963 rhaas@postgresql.org 1069 [ + + ]:CBC 5193 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1070 : : {
1071 : : /*
1072 : : * Read as many messages as we can from each worker, but stop when
1073 : : * either (1) the worker's error queue goes away, which can happen
1074 : : * if we receive a Terminate message from the worker; or (2) no
1075 : : * more messages can be read from the worker without blocking.
1076 : : */
3272 1077 [ + + ]: 5207 : while (pcxt->worker[i].error_mqh != NULL)
1078 : : {
1079 : : shm_mq_result res;
1080 : : Size nbytes;
1081 : : void *data;
1082 : :
1083 : 2353 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1084 : : &data, true);
1085 [ + + ]: 2353 : if (res == SHM_MQ_WOULD_BLOCK)
1086 : 1029 : break;
1087 [ + - ]: 1324 : else if (res == SHM_MQ_SUCCESS)
1088 : : {
1089 : : StringInfoData msg;
1090 : :
1091 : 1324 : initStringInfo(&msg);
1092 : 1324 : appendBinaryStringInfo(&msg, data, nbytes);
1093 : 1324 : HandleParallelMessage(pcxt, i, &msg);
1094 : 1321 : pfree(msg.data);
1095 : : }
1096 : : else
3272 rhaas@postgresql.org 1097 [ # # ]:UBC 0 : ereport(ERROR,
1098 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1099 : : errmsg("lost connection to parallel worker")));
1100 : : }
1101 : : }
1102 : : }
1103 : :
2788 tgl@sss.pgh.pa.us 1104 :CBC 1291 : MemoryContextSwitchTo(oldcontext);
1105 : :
1106 : : /* Might as well clear the context on our way out */
1107 : 1291 : MemoryContextReset(hpm_context);
1108 : :
2812 1109 [ - + ]: 1291 : RESUME_INTERRUPTS();
3272 rhaas@postgresql.org 1110 : 1291 : }
1111 : :
1112 : : /*
1113 : : * Handle a single protocol message received from a single parallel worker.
1114 : : */
1115 : : static void
1116 : 1324 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1117 : : {
1118 : : char msgtype;
1119 : :
2263 1120 [ + - ]: 1324 : if (pcxt->known_attached_workers != NULL &&
1121 [ + + ]: 1324 : !pcxt->known_attached_workers[i])
1122 : : {
1123 : 1263 : pcxt->known_attached_workers[i] = true;
1124 : 1263 : pcxt->nknown_attached_workers++;
1125 : : }
1126 : :
3272 1127 : 1324 : msgtype = pq_getmsgbyte(msg);
1128 : :
1129 [ + - + + : 1324 : switch (msgtype)
- ]
1130 : : {
236 nathan@postgresql.or 1131 :GNC 3 : case PqMsg_ErrorResponse:
1132 : : case PqMsg_NoticeResponse:
1133 : : {
1134 : : ErrorData edata;
1135 : : ErrorContextCallback *save_error_context_stack;
1136 : :
1137 : : /* Parse ErrorResponse or NoticeResponse. */
3272 rhaas@postgresql.org 1138 :CBC 3 : pq_parse_errornotice(msg, &edata);
1139 : :
1140 : : /* Death of a worker isn't enough justification for suicide. */
1141 : 3 : edata.elevel = Min(edata.elevel, ERROR);
1142 : :
1143 : : /*
1144 : : * If desired, add a context line to show that this is a
1145 : : * message propagated from a parallel worker. Otherwise, it
1146 : : * can sometimes be confusing to understand what actually
1147 : : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1148 : : * because it causes test-result instability depending on
1149 : : * whether a parallel worker is actually used or not.)
1150 : : */
424 drowley@postgresql.o 1151 [ + - ]: 3 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1152 : : {
2788 tgl@sss.pgh.pa.us 1153 [ - + ]: 3 : if (edata.context)
2788 tgl@sss.pgh.pa.us 1154 :UBC 0 : edata.context = psprintf("%s\n%s", edata.context,
1155 : : _("parallel worker"));
1156 : : else
2788 tgl@sss.pgh.pa.us 1157 :CBC 3 : edata.context = pstrdup(_("parallel worker"));
1158 : : }
1159 : :
1160 : : /*
1161 : : * Context beyond that should use the error context callbacks
1162 : : * that were in effect when the ParallelContext was created,
1163 : : * not the current ones.
1164 : : */
1165 : 3 : save_error_context_stack = error_context_stack;
1166 : 3 : error_context_stack = pcxt->error_context_stack;
1167 : :
1168 : : /* Rethrow error or print notice. */
3272 rhaas@postgresql.org 1169 : 3 : ThrowErrorData(&edata);
1170 : :
1171 : : /* Not an error, so restore previous context stack. */
3272 rhaas@postgresql.org 1172 :UBC 0 : error_context_stack = save_error_context_stack;
1173 : :
1174 : 0 : break;
1175 : : }
1176 : :
236 nathan@postgresql.or 1177 :UNC 0 : case PqMsg_NotificationResponse:
1178 : : {
1179 : : /* Propagate NotifyResponse. */
1180 : : int32 pid;
1181 : : const char *channel;
1182 : : const char *payload;
1183 : :
2845 rhaas@postgresql.org 1184 :UBC 0 : pid = pq_getmsgint(msg, 4);
1185 : 0 : channel = pq_getmsgrawstring(msg);
1186 : 0 : payload = pq_getmsgrawstring(msg);
1187 : 0 : pq_endmessage(msg);
1188 : :
1189 : 0 : NotifyMyFrontEnd(channel, payload, pid);
1190 : :
3272 1191 : 0 : break;
1192 : : }
1193 : :
278 msawada@postgresql.o 1194 :GNC 2 : case 'P': /* Parallel progress reporting */
1195 : : {
1196 : : /*
1197 : : * Only incremental progress reporting is currently supported.
1198 : : * However, it's possible to add more fields to the message to
1199 : : * allow for handling of other backend progress APIs.
1200 : : */
1201 : 2 : int index = pq_getmsgint(msg, 4);
1202 : 2 : int64 incr = pq_getmsgint64(msg);
1203 : :
1204 : 2 : pq_getmsgend(msg);
1205 : :
1206 : 2 : pgstat_progress_incr_param(index, incr);
1207 : :
1208 : 2 : break;
1209 : : }
1210 : :
236 nathan@postgresql.or 1211 : 1319 : case PqMsg_Terminate:
1212 : : {
2418 tgl@sss.pgh.pa.us 1213 :CBC 1319 : shm_mq_detach(pcxt->worker[i].error_mqh);
3272 rhaas@postgresql.org 1214 : 1319 : pcxt->worker[i].error_mqh = NULL;
1215 : 1319 : break;
1216 : : }
1217 : :
3272 rhaas@postgresql.org 1218 :UBC 0 : default:
1219 : : {
2813 tgl@sss.pgh.pa.us 1220 [ # # ]: 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1221 : : msgtype, msg->len);
1222 : : }
1223 : : }
3272 rhaas@postgresql.org 1224 :CBC 1321 : }
1225 : :
1226 : : /*
1227 : : * End-of-subtransaction cleanup for parallel contexts.
1228 : : *
1229 : : * Here we remove only parallel contexts initiated within the current
1230 : : * subtransaction.
1231 : : */
1232 : : void
1233 : 9927 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1234 : : {
1235 [ + + ]: 9930 : while (!dlist_is_empty(&pcxt_list))
1236 : : {
1237 : : ParallelContext *pcxt;
1238 : :
1239 : 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1240 [ - + ]: 3 : if (pcxt->subid != mySubId)
3272 rhaas@postgresql.org 1241 :UBC 0 : break;
3272 rhaas@postgresql.org 1242 [ - + ]:CBC 3 : if (isCommit)
3272 rhaas@postgresql.org 1243 [ # # ]:UBC 0 : elog(WARNING, "leaked parallel context");
3272 rhaas@postgresql.org 1244 :CBC 3 : DestroyParallelContext(pcxt);
1245 : : }
1246 : 9927 : }
1247 : :
1248 : : /*
1249 : : * End-of-transaction cleanup for parallel contexts.
1250 : : *
1251 : : * We nuke all remaining parallel contexts.
1252 : : */
1253 : : void
1254 : 431197 : AtEOXact_Parallel(bool isCommit)
1255 : : {
1256 [ - + ]: 431197 : while (!dlist_is_empty(&pcxt_list))
1257 : : {
1258 : : ParallelContext *pcxt;
1259 : :
3272 rhaas@postgresql.org 1260 :UBC 0 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1261 [ # # ]: 0 : if (isCommit)
1262 [ # # ]: 0 : elog(WARNING, "leaked parallel context");
1263 : 0 : DestroyParallelContext(pcxt);
1264 : : }
3272 rhaas@postgresql.org 1265 :CBC 431197 : }
1266 : :
1267 : : /*
1268 : : * Main entrypoint for parallel workers.
1269 : : */
1270 : : void
1271 : 1322 : ParallelWorkerMain(Datum main_arg)
1272 : : {
1273 : : dsm_segment *seg;
1274 : : shm_toc *toc;
1275 : : FixedParallelState *fps;
1276 : : char *error_queue_space;
1277 : : shm_mq *mq;
1278 : : shm_mq_handle *mqh;
1279 : : char *libraryspace;
1280 : : char *entrypointstate;
1281 : : char *library_name;
1282 : : char *function_name;
1283 : : parallel_worker_main_type entrypt;
1284 : : char *gucspace;
1285 : : char *combocidspace;
1286 : : char *tsnapspace;
1287 : : char *asnapspace;
1288 : : char *tstatespace;
1289 : : char *pendingsyncsspace;
1290 : : char *reindexspace;
1291 : : char *relmapperspace;
1292 : : char *uncommittedenumsspace;
1293 : : char *clientconninfospace;
1294 : : char *session_dsm_handle_space;
1295 : : Snapshot tsnapshot;
1296 : : Snapshot asnapshot;
1297 : :
1298 : : /* Set flag to indicate that we're initializing a parallel worker. */
3103 1299 : 1322 : InitializingParallelWorker = true;
1300 : :
1301 : : /* Establish signal handlers. */
3272 1302 : 1322 : pqsignal(SIGTERM, die);
1303 : 1322 : BackgroundWorkerUnblockSignals();
1304 : :
1305 : : /* Determine and set our parallel worker number. */
3083 1306 [ - + ]: 1322 : Assert(ParallelWorkerNumber == -1);
1307 : 1322 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1308 : :
1309 : : /* Set up a memory context to work in, just for cleanliness. */
3272 1310 : 1322 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1311 : : "Parallel worker",
1312 : : ALLOCSET_DEFAULT_SIZES);
1313 : :
1314 : : /*
1315 : : * Attach to the dynamic shared memory segment for the parallel query, and
1316 : : * find its table of contents.
1317 : : *
1318 : : * Note: at this point, we have not created any ResourceOwner in this
1319 : : * process. This will result in our DSM mapping surviving until process
1320 : : * exit, which is fine. If there were a ResourceOwner, it would acquire
1321 : : * ownership of the mapping, but we have no need for that.
1322 : : */
1323 : 1322 : seg = dsm_attach(DatumGetUInt32(main_arg));
1324 [ - + ]: 1322 : if (seg == NULL)
3272 rhaas@postgresql.org 1325 [ # # ]:UBC 0 : ereport(ERROR,
1326 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1327 : : errmsg("could not map dynamic shared memory segment")));
3272 rhaas@postgresql.org 1328 :CBC 1322 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1329 [ - + ]: 1322 : if (toc == NULL)
3272 rhaas@postgresql.org 1330 [ # # ]:UBC 0 : ereport(ERROR,
1331 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1332 : : errmsg("invalid magic number in dynamic shared memory segment")));
1333 : :
1334 : : /* Look up fixed parallel state. */
2505 tgl@sss.pgh.pa.us 1335 :CBC 1322 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
3272 rhaas@postgresql.org 1336 : 1322 : MyFixedParallelState = fps;
1337 : :
1338 : : /* Arrange to signal the leader if we exit. */
1400 andres@anarazel.de 1339 : 1322 : ParallelLeaderPid = fps->parallel_leader_pid;
42 heikki.linnakangas@i 1340 :GNC 1322 : ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1110 andres@anarazel.de 1341 :CBC 1322 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1342 : :
1343 : : /*
1344 : : * Now we can find and attach to the error queue provided for us. That's
1345 : : * good, because until we do that, any errors that happen here will not be
1346 : : * reported back to the process that requested that this worker be
1347 : : * launched.
1348 : : */
2505 tgl@sss.pgh.pa.us 1349 : 1322 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
3272 rhaas@postgresql.org 1350 : 1322 : mq = (shm_mq *) (error_queue_space +
3249 bruce@momjian.us 1351 : 1322 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
3272 rhaas@postgresql.org 1352 : 1322 : shm_mq_set_sender(mq, MyProc);
1353 : 1322 : mqh = shm_mq_attach(mq, seg, NULL);
3103 1354 : 1322 : pq_redirect_to_shm_mq(seg, mqh);
1400 andres@anarazel.de 1355 : 1322 : pq_set_parallel_leader(fps->parallel_leader_pid,
1356 : : fps->parallel_leader_proc_number);
1357 : :
1358 : : /*
1359 : : * Hooray! Primary initialization is complete. Now, we need to set up our
1360 : : * backend-local state to match the original backend.
1361 : : */
1362 : :
1363 : : /*
1364 : : * Join locking group. We must do this before anything that could try to
1365 : : * acquire a heavyweight lock, because any heavyweight locks acquired to
1366 : : * this point could block either directly against the parallel group
1367 : : * leader or against some process which in turn waits for a lock that
1368 : : * conflicts with the parallel group leader, causing an undetected
1369 : : * deadlock. (If we can't join the lock group, the leader has gone away,
1370 : : * so just exit quietly.)
1371 : : */
1372 [ - + ]: 1322 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1373 : : fps->parallel_leader_pid))
2989 rhaas@postgresql.org 1374 :UBC 0 : return;
1375 : :
1376 : : /*
1377 : : * Restore transaction and statement start-time timestamps. This must
1378 : : * happen before anything that would start a transaction, else asserts in
1379 : : * xact.c will fire.
1380 : : */
2017 tgl@sss.pgh.pa.us 1381 :CBC 1322 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1382 : :
1383 : : /*
1384 : : * Identify the entry point to be called. In theory this could result in
1385 : : * loading an additional library, though most likely the entry point is in
1386 : : * the core backend or in a library we just loaded.
1387 : : */
2505 1388 : 1322 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
2557 1389 : 1322 : library_name = entrypointstate;
1390 : 1322 : function_name = entrypointstate + strlen(library_name) + 1;
1391 : :
1392 : 1322 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
1393 : :
1394 : : /* Restore database connection. */
3272 rhaas@postgresql.org 1395 : 1322 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1396 : : fps->authenticated_user_id,
1397 : : 0);
1398 : :
1399 : : /*
1400 : : * Set the client encoding to the database encoding, since that is what
1401 : : * the leader will expect.
1402 : : */
2845 1403 : 1322 : SetClientEncoding(GetDatabaseEncoding());
1404 : :
1405 : : /*
1406 : : * Load libraries that were loaded by original backend. We want to do
1407 : : * this before restoring GUCs, because the libraries might define custom
1408 : : * variables.
1409 : : */
2033 tmunro@postgresql.or 1410 : 1322 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1411 : 1322 : StartTransactionCommand();
1412 : 1322 : RestoreLibraryState(libraryspace);
1413 : :
1414 : : /* Restore GUC values from launching backend. */
2505 tgl@sss.pgh.pa.us 1415 : 1322 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
3272 rhaas@postgresql.org 1416 : 1322 : RestoreGUCState(gucspace);
1417 : 1322 : CommitTransactionCommand();
1418 : :
1419 : : /* Crank up a transaction state appropriate to a parallel worker. */
2505 tgl@sss.pgh.pa.us 1420 : 1322 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
3272 rhaas@postgresql.org 1421 : 1322 : StartParallelWorkerTransaction(tstatespace);
1422 : :
1423 : : /* Restore combo CID state. */
2505 tgl@sss.pgh.pa.us 1424 : 1322 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
3272 rhaas@postgresql.org 1425 : 1322 : RestoreComboCIDState(combocidspace);
1426 : :
1427 : : /* Attach to the per-session DSM segment and contained objects. */
1428 : : session_dsm_handle_space =
2404 andres@anarazel.de 1429 : 1322 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1430 : 1322 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1431 : :
1432 : : /*
1433 : : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1434 : : * the leader has serialized the transaction snapshot and we must restore
1435 : : * it. At lower isolation levels, there is no transaction-lifetime
1436 : : * snapshot, but we need TransactionXmin to get set to a value which is
1437 : : * less than or equal to the xmin of every snapshot that will be used by
1438 : : * this worker. The easiest way to accomplish that is to install the
1439 : : * active snapshot as the transaction snapshot. Code running in this
1440 : : * parallel worker might take new snapshots via GetTransactionSnapshot()
1441 : : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1442 : : * snapshot older than the active snapshot.
1443 : : */
2505 tgl@sss.pgh.pa.us 1444 : 1322 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
963 rhaas@postgresql.org 1445 : 1322 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1446 : 1322 : asnapshot = RestoreSnapshot(asnapspace);
1447 [ + + ]: 1322 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1448 : 1322 : RestoreTransactionSnapshot(tsnapshot,
1449 : 1322 : fps->parallel_leader_pgproc);
1450 : 1322 : PushActiveSnapshot(asnapshot);
1451 : :
1452 : : /*
1453 : : * We've changed which tuples we can see, and must therefore invalidate
1454 : : * system caches.
1455 : : */
3103 1456 : 1322 : InvalidateSystemCaches();
1457 : :
1458 : : /*
1459 : : * Restore current role id. Skip verifying whether session user is
1460 : : * allowed to become this role and blindly restore the leader's state for
1461 : : * current role.
1462 : : */
2359 1463 : 1322 : SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1464 : :
1465 : : /* Restore user ID and security context. */
3272 1466 : 1322 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1467 : :
1468 : : /* Restore temp-namespace state to ensure search path matches leader's. */
2866 tgl@sss.pgh.pa.us 1469 : 1322 : SetTempNamespaceState(fps->temp_namespace_id,
1470 : : fps->temp_toast_namespace_id);
1471 : :
1472 : : /* Restore pending syncs. */
1471 noah@leadboat.com 1473 : 1322 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1474 : : false);
1475 : 1322 : RestorePendingSyncs(pendingsyncsspace);
1476 : :
1477 : : /* Restore reindex state. */
2277 rhaas@postgresql.org 1478 : 1322 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1479 : 1322 : RestoreReindexState(reindexspace);
1480 : :
1481 : : /* Restore relmapper state. */
2074 pg@bowt.ie 1482 : 1322 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1483 : 1322 : RestoreRelationMap(relmapperspace);
1484 : :
1485 : : /* Restore uncommitted enums. */
1195 tmunro@postgresql.or 1486 : 1322 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1487 : : false);
1488 : 1322 : RestoreUncommittedEnums(uncommittedenumsspace);
1489 : :
1490 : : /* Restore the ClientConnectionInfo. */
599 michael@paquier.xyz 1491 : 1322 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1492 : : false);
1493 : 1322 : RestoreClientConnectionInfo(clientconninfospace);
1494 : :
1495 : : /*
1496 : : * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1497 : : * ensure that auth_method is actually valid, aka authn_id is not NULL.
1498 : : */
563 1499 [ + + ]: 1322 : if (MyClientConnectionInfo.authn_id)
1500 : 4 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1501 : : hba_authname(MyClientConnectionInfo.auth_method));
1502 : :
1503 : : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
1857 tmunro@postgresql.or 1504 : 1322 : AttachSerializableXact(fps->serializable_xact_handle);
1505 : :
1506 : : /*
1507 : : * We've initialized all of our state now; nothing should change
1508 : : * hereafter.
1509 : : */
3103 rhaas@postgresql.org 1510 : 1322 : InitializingParallelWorker = false;
3272 1511 : 1322 : EnterParallelMode();
1512 : :
1513 : : /*
1514 : : * Time to do the real work: invoke the caller-supplied code.
1515 : : */
2557 tgl@sss.pgh.pa.us 1516 : 1322 : entrypt(seg, toc);
1517 : :
1518 : : /* Must exit parallel mode to pop active snapshot. */
3272 rhaas@postgresql.org 1519 : 1319 : ExitParallelMode();
1520 : :
1521 : : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1522 : 1319 : PopActiveSnapshot();
1523 : :
1524 : : /* Shut down the parallel-worker transaction. */
1525 : 1319 : EndParallelWorkerTransaction();
1526 : :
1527 : : /* Detach from the per-session DSM segment. */
2404 andres@anarazel.de 1528 : 1319 : DetachSession();
1529 : :
1530 : : /* Report success. */
236 nathan@postgresql.or 1531 :GNC 1319 : pq_putmessage(PqMsg_Terminate, NULL, 0);
1532 : : }
1533 : :
1534 : : /*
1535 : : * Update shared memory with the ending location of the last WAL record we
1536 : : * wrote, if it's greater than the value already stored there.
1537 : : */
1538 : : void
3272 rhaas@postgresql.org 1539 :CBC 1319 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1540 : : {
1541 : 1319 : FixedParallelState *fps = MyFixedParallelState;
1542 : :
1543 [ - + ]: 1319 : Assert(fps != NULL);
1544 [ + + ]: 1319 : SpinLockAcquire(&fps->mutex);
1545 [ + + ]: 1319 : if (fps->last_xlog_end < last_xlog_end)
1546 : 64 : fps->last_xlog_end = last_xlog_end;
1547 : 1319 : SpinLockRelease(&fps->mutex);
1548 : 1319 : }
1549 : :
1550 : : /*
1551 : : * Make sure the leader tries to read from our error queue one more time.
1552 : : * This guards against the case where we exit uncleanly without sending an
1553 : : * ErrorResponse to the leader, for example because some code calls proc_exit
1554 : : * directly.
1555 : : *
1556 : : * Also explicitly detach from dsm segment so that subsystems using
1557 : : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1558 : : * shut down as part of a before_shmem_exit() hook.
1559 : : *
1560 : : * One might think this could instead be solved by carefully ordering the
1561 : : * attaching to dsm segments, so that the pgstats segments get detached from
1562 : : * later than the parallel query one. That turns out to not work because the
1563 : : * stats hash might need to grow which can cause new segments to be allocated,
1564 : : * which then will be detached from earlier.
1565 : : */
1566 : : static void
2273 1567 : 1322 : ParallelWorkerShutdown(int code, Datum arg)
1568 : : {
1400 andres@anarazel.de 1569 : 1322 : SendProcSignal(ParallelLeaderPid,
1570 : : PROCSIG_PARALLEL_MESSAGE,
1571 : : ParallelLeaderProcNumber);
1572 : :
1110 1573 : 1322 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
2273 rhaas@postgresql.org 1574 : 1322 : }
1575 : :
1576 : : /*
1577 : : * Look up (and possibly load) a parallel worker entry point function.
1578 : : *
1579 : : * For functions contained in the core code, we use library name "postgres"
1580 : : * and consult the InternalParallelWorkers array. External functions are
1581 : : * looked up, and loaded if necessary, using load_external_function().
1582 : : *
1583 : : * The point of this is to pass function names as strings across process
1584 : : * boundaries. We can't pass actual function addresses because of the
1585 : : * possibility that the function has been loaded at a different address
1586 : : * in a different process. This is obviously a hazard for functions in
1587 : : * loadable libraries, but it can happen even for functions in the core code
1588 : : * on platforms using EXEC_BACKEND (e.g., Windows).
1589 : : *
1590 : : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1591 : : * in favor of applying load_external_function() for core functions too;
1592 : : * but that raises portability issues that are not worth addressing now.
1593 : : */
1594 : : static parallel_worker_main_type
2557 tgl@sss.pgh.pa.us 1595 : 1322 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1596 : : {
1597 : : /*
1598 : : * If the function is to be loaded from postgres itself, search the
1599 : : * InternalParallelWorkers array.
1600 : : */
1601 [ + - ]: 1322 : if (strcmp(libraryname, "postgres") == 0)
1602 : : {
1603 : : int i;
1604 : :
1605 [ + - ]: 1447 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1606 : : {
1607 [ + + ]: 1447 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1608 : 1322 : return InternalParallelWorkers[i].fn_addr;
1609 : : }
1610 : :
1611 : : /* We can only reach this by programming error. */
2557 tgl@sss.pgh.pa.us 1612 [ # # ]:UBC 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1613 : : }
1614 : :
1615 : : /* Otherwise load from external library. */
1616 : 0 : return (parallel_worker_main_type)
1617 : 0 : load_external_function(libraryname, funcname, true, NULL);
1618 : : }
|