Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execParallel.c
4 : : * Support routines for parallel execution.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * This file contains routines that are intended to support setting up,
10 : : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : : * executor. The ParallelContext machinery will handle starting the
12 : : * workers and ensuring that their state generally matches that of the
13 : : * leader; see src/backend/access/transam/README.parallel for details.
14 : : * However, we must save and restore relevant executor state, such as
15 : : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : : * the actual plan to be passed down to the worker.
17 : : *
18 : : * IDENTIFICATION
19 : : * src/backend/executor/execParallel.c
20 : : *
21 : : *-------------------------------------------------------------------------
22 : : */
23 : :
24 : : #include "postgres.h"
25 : :
26 : : #include "executor/execParallel.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeAgg.h"
29 : : #include "executor/nodeAppend.h"
30 : : #include "executor/nodeBitmapHeapscan.h"
31 : : #include "executor/nodeCustom.h"
32 : : #include "executor/nodeForeignscan.h"
33 : : #include "executor/nodeHash.h"
34 : : #include "executor/nodeHashjoin.h"
35 : : #include "executor/nodeIncrementalSort.h"
36 : : #include "executor/nodeIndexonlyscan.h"
37 : : #include "executor/nodeIndexscan.h"
38 : : #include "executor/nodeMemoize.h"
39 : : #include "executor/nodeSeqscan.h"
40 : : #include "executor/nodeSort.h"
41 : : #include "executor/nodeSubplan.h"
42 : : #include "executor/tqueue.h"
43 : : #include "jit/jit.h"
44 : : #include "nodes/nodeFuncs.h"
45 : : #include "pgstat.h"
46 : : #include "tcop/tcopprot.h"
47 : : #include "utils/datum.h"
48 : : #include "utils/dsa.h"
49 : : #include "utils/lsyscache.h"
50 : : #include "utils/snapmgr.h"
51 : :
52 : : /*
53 : : * Magic numbers for parallel executor communication. We use constants
54 : : * greater than any 32-bit integer here so that values < 2^32 can be used
55 : : * by individual parallel nodes to store their own state.
56 : : */
57 : : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
58 : : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
59 : : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
60 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
61 : : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
62 : : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
63 : : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
64 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
65 : : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
66 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
67 : :
68 : : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
69 : :
70 : : /*
71 : : * Fixed-size random stuff that we need to pass to parallel workers.
72 : : */
73 : : typedef struct FixedParallelExecutorState
74 : : {
75 : : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
76 : : dsa_pointer param_exec;
77 : : int eflags;
78 : : int jit_flags;
79 : : } FixedParallelExecutorState;
80 : :
81 : : /*
82 : : * DSM structure for accumulating per-PlanState instrumentation.
83 : : *
84 : : * instrument_options: Same meaning here as in instrument.c.
85 : : *
86 : : * instrument_offset: Offset, relative to the start of this structure,
87 : : * of the first Instrumentation object. This will depend on the length of
88 : : * the plan_node_id array.
89 : : *
90 : : * num_workers: Number of workers.
91 : : *
92 : : * num_plan_nodes: Number of plan nodes.
93 : : *
94 : : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
95 : : * from parallel workers. The length of this array is given by num_plan_nodes.
96 : : */
97 : : struct SharedExecutorInstrumentation
98 : : {
99 : : int instrument_options;
100 : : int instrument_offset;
101 : : int num_workers;
102 : : int num_plan_nodes;
103 : : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
104 : : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
105 : : };
106 : : #define GetInstrumentationArray(sei) \
107 : : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
108 : : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
109 : :
110 : : /* Context object for ExecParallelEstimate. */
111 : : typedef struct ExecParallelEstimateContext
112 : : {
113 : : ParallelContext *pcxt;
114 : : int nnodes;
115 : : } ExecParallelEstimateContext;
116 : :
117 : : /* Context object for ExecParallelInitializeDSM. */
118 : : typedef struct ExecParallelInitializeDSMContext
119 : : {
120 : : ParallelContext *pcxt;
121 : : SharedExecutorInstrumentation *instrumentation;
122 : : int nnodes;
123 : : } ExecParallelInitializeDSMContext;
124 : :
125 : : /* Helper functions that run in the parallel leader. */
126 : : static char *ExecSerializePlan(Plan *plan, EState *estate);
127 : : static bool ExecParallelEstimate(PlanState *planstate,
128 : : ExecParallelEstimateContext *e);
129 : : static bool ExecParallelInitializeDSM(PlanState *planstate,
130 : : ExecParallelInitializeDSMContext *d);
131 : : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
132 : : bool reinitialize);
133 : : static bool ExecParallelReInitializeDSM(PlanState *planstate,
134 : : ParallelContext *pcxt);
135 : : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
136 : : SharedExecutorInstrumentation *instrumentation);
137 : :
138 : : /* Helper function that runs in the parallel worker. */
139 : : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
140 : :
141 : : /*
142 : : * Create a serialized representation of the plan to be sent to each worker.
143 : : */
144 : : static char *
3119 rhaas@postgresql.org 145 :CBC 332 : ExecSerializePlan(Plan *plan, EState *estate)
146 : : {
147 : : PlannedStmt *pstmt;
148 : : ListCell *lc;
149 : :
150 : : /* We can't scribble on the original plan, so make a copy. */
3121 151 : 332 : plan = copyObject(plan);
152 : :
153 : : /*
154 : : * The worker will start its own copy of the executor, and that copy will
155 : : * insert a junk filter if the toplevel node has any resjunk entries. We
156 : : * don't want that to happen, because while resjunk columns shouldn't be
157 : : * sent back to the user, here the tuples are coming back to another
158 : : * backend which may very well need them. So mutate the target list
159 : : * accordingly. This is sort of a hack; there might be better ways to do
160 : : * this...
161 : : */
2559 tgl@sss.pgh.pa.us 162 [ + + + + : 911 : foreach(lc, plan->targetlist)
+ + ]
163 : : {
164 : 579 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
165 : :
3121 rhaas@postgresql.org 166 : 579 : tle->resjunk = false;
167 : : }
168 : :
169 : : /*
170 : : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
171 : : * for our purposes, but the worker will need at least a minimal
172 : : * PlannedStmt to start the executor.
173 : : */
174 : 332 : pstmt = makeNode(PlannedStmt);
175 : 332 : pstmt->commandType = CMD_SELECT;
1090 bruce@momjian.us 176 : 332 : pstmt->queryId = pgstat_get_my_query_id();
2830 tgl@sss.pgh.pa.us 177 : 332 : pstmt->hasReturning = false;
178 : 332 : pstmt->hasModifyingCTE = false;
179 : 332 : pstmt->canSetTag = true;
180 : 332 : pstmt->transientPlan = false;
181 : 332 : pstmt->dependsOnRole = false;
182 : 332 : pstmt->parallelModeNeeded = false;
3121 rhaas@postgresql.org 183 : 332 : pstmt->planTree = plan;
3119 184 : 332 : pstmt->rtable = estate->es_range_table;
495 alvherre@alvh.no-ip. 185 : 332 : pstmt->permInfos = estate->es_rteperminfos;
3121 rhaas@postgresql.org 186 : 332 : pstmt->resultRelations = NIL;
1586 tgl@sss.pgh.pa.us 187 : 332 : pstmt->appendRelations = NIL;
188 : :
189 : : /*
190 : : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
191 : : * for unsafe ones (so that the list indexes of the safe ones are
192 : : * preserved). This positively ensures that the worker won't try to run,
193 : : * or even do ExecInitNode on, an unsafe subplan. That's important to
194 : : * protect, eg, non-parallel-aware FDWs from getting into trouble.
195 : : */
2559 196 : 332 : pstmt->subplans = NIL;
197 [ + + + + : 359 : foreach(lc, estate->es_plannedstmt->subplans)
+ + ]
198 : : {
199 : 27 : Plan *subplan = (Plan *) lfirst(lc);
200 : :
201 [ + - + + ]: 27 : if (subplan && !subplan->parallel_safe)
202 : 6 : subplan = NULL;
203 : 27 : pstmt->subplans = lappend(pstmt->subplans, subplan);
204 : : }
205 : :
3121 rhaas@postgresql.org 206 : 332 : pstmt->rewindPlanIDs = NULL;
207 : 332 : pstmt->rowMarks = NIL;
208 : 332 : pstmt->relationOids = NIL;
209 : 332 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
2344 210 : 332 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
2647 tgl@sss.pgh.pa.us 211 : 332 : pstmt->utilityStmt = NULL;
212 : 332 : pstmt->stmt_location = -1;
213 : 332 : pstmt->stmt_len = -1;
214 : :
215 : : /* Return serialized copy of our dummy PlannedStmt. */
3121 rhaas@postgresql.org 216 : 332 : return nodeToString(pstmt);
217 : : }
218 : :
219 : : /*
220 : : * Parallel-aware plan nodes (and occasionally others) may need some state
221 : : * which is shared across all parallel workers. Before we size the DSM, give
222 : : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
223 : : * &pcxt->estimator.
224 : : *
225 : : * While we're at it, count the number of PlanState nodes in the tree, so
226 : : * we know how many Instrumentation structures we need.
227 : : */
228 : : static bool
229 : 1468 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
230 : : {
231 [ - + ]: 1468 : if (planstate == NULL)
3121 rhaas@postgresql.org 232 :UBC 0 : return false;
233 : :
234 : : /* Count this node. */
3121 rhaas@postgresql.org 235 :CBC 1468 : e->nnodes++;
236 : :
2420 237 [ + + + - : 1468 : switch (nodeTag(planstate))
+ - + + +
+ - + +
+ ]
238 : : {
239 : 569 : case T_SeqScanState:
240 [ + + ]: 569 : if (planstate->plan->parallel_aware)
3007 241 : 450 : ExecSeqScanEstimate((SeqScanState *) planstate,
242 : : e->pcxt);
2420 243 : 569 : break;
244 : 153 : case T_IndexScanState:
245 [ + + ]: 153 : if (planstate->plan->parallel_aware)
2615 246 : 9 : ExecIndexScanEstimate((IndexScanState *) planstate,
247 : : e->pcxt);
2420 248 : 153 : break;
249 : 26 : case T_IndexOnlyScanState:
250 [ + + ]: 26 : if (planstate->plan->parallel_aware)
2611 251 : 20 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
252 : : e->pcxt);
2420 253 : 26 : break;
2420 rhaas@postgresql.org 254 :UBC 0 : case T_ForeignScanState:
255 [ # # ]: 0 : if (planstate->plan->parallel_aware)
2993 256 : 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
257 : : e->pcxt);
2420 258 : 0 : break;
2322 rhaas@postgresql.org 259 :CBC 93 : case T_AppendState:
260 [ + + ]: 93 : if (planstate->plan->parallel_aware)
261 : 69 : ExecAppendEstimate((AppendState *) planstate,
262 : : e->pcxt);
263 : 93 : break;
2420 rhaas@postgresql.org 264 :UBC 0 : case T_CustomScanState:
265 [ # # ]: 0 : if (planstate->plan->parallel_aware)
2993 266 : 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
267 : : e->pcxt);
2420 268 : 0 : break;
2420 rhaas@postgresql.org 269 :CBC 10 : case T_BitmapHeapScanState:
270 [ + + ]: 10 : if (planstate->plan->parallel_aware)
2594 271 : 9 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
272 : : e->pcxt);
2420 273 : 10 : break;
2307 andres@anarazel.de 274 : 96 : case T_HashJoinState:
275 [ + + ]: 96 : if (planstate->plan->parallel_aware)
276 : 60 : ExecHashJoinEstimate((HashJoinState *) planstate,
277 : : e->pcxt);
278 : 96 : break;
2322 279 : 96 : case T_HashState:
280 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
281 : 96 : ExecHashEstimate((HashState *) planstate, e->pcxt);
282 : 96 : break;
2420 rhaas@postgresql.org 283 : 76 : case T_SortState:
284 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
285 : 76 : ExecSortEstimate((SortState *) planstate, e->pcxt);
2419 tgl@sss.pgh.pa.us 286 : 76 : break;
1469 tomas.vondra@postgre 287 :UBC 0 : case T_IncrementalSortState:
288 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
289 : 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
290 : 0 : break;
1395 drowley@postgresql.o 291 :CBC 283 : case T_AggState:
292 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
293 : 283 : ExecAggEstimate((AggState *) planstate, e->pcxt);
294 : 283 : break;
1005 295 : 3 : case T_MemoizeState:
296 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
297 : 3 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
1108 298 : 3 : break;
2420 rhaas@postgresql.org 299 : 63 : default:
300 : 63 : break;
301 : : }
302 : :
3121 303 : 1468 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
304 : : }
305 : :
306 : : /*
307 : : * Estimate the amount of space required to serialize the indicated parameters.
308 : : */
309 : : static Size
2341 310 : 12 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
311 : : {
312 : : int paramid;
313 : 12 : Size sz = sizeof(int);
314 : :
315 : 12 : paramid = -1;
316 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
317 : : {
318 : : Oid typeOid;
319 : : int16 typLen;
320 : : bool typByVal;
321 : : ParamExecData *prm;
322 : :
323 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
324 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
325 : : paramid);
326 : :
327 : 15 : sz = add_size(sz, sizeof(int)); /* space for paramid */
328 : :
329 : : /* space for datum/isnull */
330 [ + - ]: 15 : if (OidIsValid(typeOid))
331 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
332 : : else
333 : : {
334 : : /* If no type OID, assume by-value, like copyParamList does. */
2341 rhaas@postgresql.org 335 :UBC 0 : typLen = sizeof(Datum);
336 : 0 : typByVal = true;
337 : : }
2341 rhaas@postgresql.org 338 :CBC 15 : sz = add_size(sz,
339 : 15 : datumEstimateSpace(prm->value, prm->isnull,
340 : : typByVal, typLen));
341 : : }
342 : 12 : return sz;
343 : : }
344 : :
345 : : /*
346 : : * Serialize specified PARAM_EXEC parameters.
347 : : *
348 : : * We write the number of parameters first, as a 4-byte integer, and then
349 : : * write details for each parameter in turn. The details for each parameter
350 : : * consist of a 4-byte paramid (location of param in execution time internal
351 : : * parameter array) and then the datum as serialized by datumSerialize().
352 : : */
353 : : static dsa_pointer
2309 354 : 12 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
355 : : {
356 : : Size size;
357 : : int nparams;
358 : : int paramid;
359 : : ParamExecData *prm;
360 : : dsa_pointer handle;
361 : : char *start_address;
362 : :
363 : : /* Allocate enough space for the current parameter values. */
2341 364 : 12 : size = EstimateParamExecSpace(estate, params);
2309 365 : 12 : handle = dsa_allocate(area, size);
366 : 12 : start_address = dsa_get_address(area, handle);
367 : :
368 : : /* First write the number of parameters as a 4-byte integer. */
2341 369 : 12 : nparams = bms_num_members(params);
370 : 12 : memcpy(start_address, &nparams, sizeof(int));
371 : 12 : start_address += sizeof(int);
372 : :
373 : : /* Write details for each parameter in turn. */
374 : 12 : paramid = -1;
375 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
376 : : {
377 : : Oid typeOid;
378 : : int16 typLen;
379 : : bool typByVal;
380 : :
381 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
382 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
383 : : paramid);
384 : :
385 : : /* Write paramid. */
386 : 15 : memcpy(start_address, ¶mid, sizeof(int));
387 : 15 : start_address += sizeof(int);
388 : :
389 : : /* Write datum/isnull */
390 [ + - ]: 15 : if (OidIsValid(typeOid))
391 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
392 : : else
393 : : {
394 : : /* If no type OID, assume by-value, like copyParamList does. */
2341 rhaas@postgresql.org 395 :UBC 0 : typLen = sizeof(Datum);
396 : 0 : typByVal = true;
397 : : }
2341 rhaas@postgresql.org 398 :CBC 15 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
399 : : &start_address);
400 : : }
401 : :
402 : 12 : return handle;
403 : : }
404 : :
405 : : /*
406 : : * Restore specified PARAM_EXEC parameters.
407 : : */
408 : : static void
409 : 36 : RestoreParamExecParams(char *start_address, EState *estate)
410 : : {
411 : : int nparams;
412 : : int i;
413 : : int paramid;
414 : :
415 : 36 : memcpy(&nparams, start_address, sizeof(int));
416 : 36 : start_address += sizeof(int);
417 : :
418 [ + + ]: 78 : for (i = 0; i < nparams; i++)
419 : : {
420 : : ParamExecData *prm;
421 : :
422 : : /* Read paramid */
423 : 42 : memcpy(¶mid, start_address, sizeof(int));
424 : 42 : start_address += sizeof(int);
425 : 42 : prm = &(estate->es_param_exec_vals[paramid]);
426 : :
427 : : /* Read datum/isnull. */
428 : 42 : prm->value = datumRestore(&start_address, &prm->isnull);
429 : 42 : prm->execPlan = NULL;
430 : : }
431 : 36 : }
432 : :
433 : : /*
434 : : * Initialize the dynamic shared memory segment that will be used to control
435 : : * parallel execution.
436 : : */
437 : : static bool
3121 438 : 1468 : ExecParallelInitializeDSM(PlanState *planstate,
439 : : ExecParallelInitializeDSMContext *d)
440 : : {
441 [ - + ]: 1468 : if (planstate == NULL)
3121 rhaas@postgresql.org 442 :UBC 0 : return false;
443 : :
444 : : /* If instrumentation is enabled, initialize slot for this node. */
3121 rhaas@postgresql.org 445 [ + + ]:CBC 1468 : if (d->instrumentation != NULL)
3049 446 : 516 : d->instrumentation->plan_node_id[d->nnodes] =
447 : 516 : planstate->plan->plan_node_id;
448 : :
449 : : /* Count this node. */
3121 450 : 1468 : d->nnodes++;
451 : :
452 : : /*
453 : : * Call initializers for DSM-using plan nodes.
454 : : *
455 : : * Most plan nodes won't do anything here, but plan nodes that allocated
456 : : * DSM may need to initialize shared state in the DSM before parallel
457 : : * workers are launched. They can allocate the space they previously
458 : : * estimated using shm_toc_allocate, and add the keys they previously
459 : : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
460 : : */
2420 461 [ + + + - : 1468 : switch (nodeTag(planstate))
+ - + + +
+ - + +
+ ]
462 : : {
463 : 569 : case T_SeqScanState:
464 [ + + ]: 569 : if (planstate->plan->parallel_aware)
3007 465 : 450 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
466 : : d->pcxt);
2420 467 : 569 : break;
468 : 153 : case T_IndexScanState:
469 [ + + ]: 153 : if (planstate->plan->parallel_aware)
2615 470 : 9 : ExecIndexScanInitializeDSM((IndexScanState *) planstate,
471 : : d->pcxt);
2420 472 : 153 : break;
473 : 26 : case T_IndexOnlyScanState:
474 [ + + ]: 26 : if (planstate->plan->parallel_aware)
2611 475 : 20 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
476 : : d->pcxt);
2420 477 : 26 : break;
2420 rhaas@postgresql.org 478 :UBC 0 : case T_ForeignScanState:
479 [ # # ]: 0 : if (planstate->plan->parallel_aware)
2993 480 : 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
481 : : d->pcxt);
2420 482 : 0 : break;
2322 rhaas@postgresql.org 483 :CBC 93 : case T_AppendState:
484 [ + + ]: 93 : if (planstate->plan->parallel_aware)
485 : 69 : ExecAppendInitializeDSM((AppendState *) planstate,
486 : : d->pcxt);
487 : 93 : break;
2420 rhaas@postgresql.org 488 :UBC 0 : case T_CustomScanState:
489 [ # # ]: 0 : if (planstate->plan->parallel_aware)
2993 490 : 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
491 : : d->pcxt);
2420 492 : 0 : break;
2420 rhaas@postgresql.org 493 :CBC 10 : case T_BitmapHeapScanState:
494 [ + + ]: 10 : if (planstate->plan->parallel_aware)
2594 495 : 9 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
496 : : d->pcxt);
2420 497 : 10 : break;
2307 andres@anarazel.de 498 : 96 : case T_HashJoinState:
499 [ + + ]: 96 : if (planstate->plan->parallel_aware)
500 : 60 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
501 : : d->pcxt);
502 : 96 : break;
2322 503 : 96 : case T_HashState:
504 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
505 : 96 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
506 : 96 : break;
2420 rhaas@postgresql.org 507 : 76 : case T_SortState:
508 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
509 : 76 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
2419 tgl@sss.pgh.pa.us 510 : 76 : break;
1469 tomas.vondra@postgre 511 :UBC 0 : case T_IncrementalSortState:
512 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
513 : 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
514 : 0 : break;
1395 drowley@postgresql.o 515 :CBC 283 : case T_AggState:
516 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
517 : 283 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
518 : 283 : break;
1005 519 : 3 : case T_MemoizeState:
520 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
521 : 3 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
1108 522 : 3 : break;
2420 rhaas@postgresql.org 523 : 63 : default:
524 : 63 : break;
525 : : }
526 : :
3121 527 : 1468 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
528 : : }
529 : :
530 : : /*
531 : : * It sets up the response queues for backend workers to return tuples
532 : : * to the main backend and start the workers.
533 : : */
534 : : static shm_mq_handle **
3089 535 : 461 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
536 : : {
537 : : shm_mq_handle **responseq;
538 : : char *tqueuespace;
539 : : int i;
540 : :
541 : : /* Skip this if no workers. */
3121 542 [ - + ]: 461 : if (pcxt->nworkers == 0)
3121 rhaas@postgresql.org 543 :UBC 0 : return NULL;
544 : :
545 : : /* Allocate memory for shared memory queue handles. */
546 : : responseq = (shm_mq_handle **)
3121 rhaas@postgresql.org 547 :CBC 461 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
548 : :
549 : : /*
550 : : * If not reinitializing, allocate space from the DSM for the queues;
551 : : * otherwise, find the already allocated space.
552 : : */
3089 553 [ + + ]: 461 : if (!reinitialize)
554 : : tqueuespace =
555 : 332 : shm_toc_allocate(pcxt->toc,
556 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
2900 557 : 332 : pcxt->nworkers));
558 : : else
2505 tgl@sss.pgh.pa.us 559 : 129 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
560 : :
561 : : /* Create the queues, and become the receiver for each. */
3121 rhaas@postgresql.org 562 [ + + ]: 1730 : for (i = 0; i < pcxt->nworkers; ++i)
563 : : {
564 : : shm_mq *mq;
565 : :
2900 566 : 1269 : mq = shm_mq_create(tqueuespace +
567 : 1269 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
568 : : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
569 : :
3121 570 : 1269 : shm_mq_set_receiver(mq, MyProc);
571 : 1269 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
572 : : }
573 : :
574 : : /* Add array of queues to shm_toc, so others can find it. */
3089 575 [ + + ]: 461 : if (!reinitialize)
576 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
577 : :
578 : : /* Return array of handles. */
3121 579 : 461 : return responseq;
580 : : }
581 : :
582 : : /*
583 : : * Sets up the required infrastructure for backend workers to perform
584 : : * execution and return results to the main backend.
585 : : */
586 : : ParallelExecutorInfo *
2341 587 : 332 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
588 : : Bitmapset *sendParams, int nworkers,
589 : : int64 tuples_needed)
590 : : {
591 : : ParallelExecutorInfo *pei;
592 : : ParallelContext *pcxt;
593 : : ExecParallelEstimateContext e;
594 : : ExecParallelInitializeDSMContext d;
595 : : FixedParallelExecutorState *fpes;
596 : : char *pstmt_data;
597 : : char *pstmt_space;
598 : : char *paramlistinfo_space;
599 : : BufferUsage *bufusage_space;
600 : : WalUsage *walusage_space;
3121 601 : 332 : SharedExecutorInstrumentation *instrumentation = NULL;
2028 andres@anarazel.de 602 : 332 : SharedJitInstrumentation *jit_instrumentation = NULL;
603 : : int pstmt_len;
604 : : int paramlistinfo_len;
3121 rhaas@postgresql.org 605 : 332 : int instrumentation_len = 0;
2028 andres@anarazel.de 606 : 332 : int jit_instrumentation_len = 0;
3049 rhaas@postgresql.org 607 : 332 : int instrument_offset = 0;
2673 608 : 332 : Size dsa_minsize = dsa_minimum_size();
609 : : char *query_string;
610 : : int query_len;
611 : :
612 : : /*
613 : : * Force any initplan outputs that we're going to pass to workers to be
614 : : * evaluated, if they weren't already.
615 : : *
616 : : * For simplicity, we use the EState's per-output-tuple ExprContext here.
617 : : * That risks intra-query memory leakage, since we might pass through here
618 : : * many times before that ExprContext gets reset; but ExecSetParamPlan
619 : : * doesn't normally leak any memory in the context (see its comments), so
620 : : * it doesn't seem worth complicating this function's API to pass it a
621 : : * shorter-lived ExprContext. This might need to change someday.
622 : : */
2038 tgl@sss.pgh.pa.us 623 [ + + ]: 332 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
624 : :
625 : : /* Allocate object for return value. */
3121 rhaas@postgresql.org 626 : 332 : pei = palloc0(sizeof(ParallelExecutorInfo));
3070 627 : 332 : pei->finished = false;
3121 628 : 332 : pei->planstate = planstate;
629 : :
630 : : /* Fix up and serialize plan to be sent to workers. */
3119 631 : 332 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
632 : :
633 : : /* Create a parallel context. */
1857 tmunro@postgresql.or 634 : 332 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
3121 rhaas@postgresql.org 635 : 332 : pei->pcxt = pcxt;
636 : :
637 : : /*
638 : : * Before telling the parallel context to create a dynamic shared memory
639 : : * segment, we need to figure out how big it should be. Estimate space
640 : : * for the various things we need to store.
641 : : */
642 : :
643 : : /* Estimate space for fixed-size state. */
2420 644 : 332 : shm_toc_estimate_chunk(&pcxt->estimator,
645 : : sizeof(FixedParallelExecutorState));
646 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
647 : :
648 : : /* Estimate space for query text. */
1095 tgl@sss.pgh.pa.us 649 : 332 : query_len = strlen(estate->es_sourceText);
2307 rhaas@postgresql.org 650 : 332 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
2608 651 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
652 : :
653 : : /* Estimate space for serialized PlannedStmt. */
3121 654 : 332 : pstmt_len = strlen(pstmt_data) + 1;
655 : 332 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
656 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
657 : :
658 : : /* Estimate space for serialized ParamListInfo. */
2341 659 : 332 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
660 : 332 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
3121 661 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
662 : :
663 : : /*
664 : : * Estimate space for BufferUsage.
665 : : *
666 : : * If EXPLAIN is not in use and there are no extensions loaded that care,
667 : : * we could skip this. But we have no way of knowing whether anyone's
668 : : * looking at pgBufferUsage, so do it unconditionally.
669 : : */
670 : 332 : shm_toc_estimate_chunk(&pcxt->estimator,
671 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
672 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
673 : :
674 : : /*
675 : : * Same thing for WalUsage.
676 : : */
1471 akapila@postgresql.o 677 : 332 : shm_toc_estimate_chunk(&pcxt->estimator,
678 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
679 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
680 : :
681 : : /* Estimate space for tuple queues. */
3121 rhaas@postgresql.org 682 : 332 : shm_toc_estimate_chunk(&pcxt->estimator,
683 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
684 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
685 : :
686 : : /*
687 : : * Give parallel-aware nodes a chance to add to the estimates, and get a
688 : : * count of how many PlanState nodes there are.
689 : : */
690 : 332 : e.pcxt = pcxt;
691 : 332 : e.nnodes = 0;
692 : 332 : ExecParallelEstimate(planstate, &e);
693 : :
694 : : /* Estimate space for instrumentation, if required. */
695 [ + + ]: 332 : if (estate->es_instrument)
696 : : {
697 : 90 : instrumentation_len =
698 : : offsetof(SharedExecutorInstrumentation, plan_node_id) +
2903 699 : 90 : sizeof(int) * e.nnodes;
3049 700 : 90 : instrumentation_len = MAXALIGN(instrumentation_len);
701 : 90 : instrument_offset = instrumentation_len;
2900 702 : 90 : instrumentation_len +=
703 : 90 : mul_size(sizeof(Instrumentation),
704 : 90 : mul_size(e.nnodes, nworkers));
3121 705 : 90 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
706 : 90 : shm_toc_estimate_keys(&pcxt->estimator, 1);
707 : :
708 : : /* Estimate space for JIT instrumentation, if required. */
2028 andres@anarazel.de 709 [ + + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
710 : : {
711 : 12 : jit_instrumentation_len =
712 : 12 : offsetof(SharedJitInstrumentation, jit_instr) +
713 : : sizeof(JitInstrumentation) * nworkers;
714 : 12 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
715 : 12 : shm_toc_estimate_keys(&pcxt->estimator, 1);
716 : : }
717 : : }
718 : :
719 : : /* Estimate space for DSA area. */
2673 rhaas@postgresql.org 720 : 332 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
721 : 332 : shm_toc_estimate_keys(&pcxt->estimator, 1);
722 : :
723 : : /*
724 : : * InitializeParallelDSM() passes the active snapshot to the parallel
725 : : * worker, which uses it to set es_snapshot. Make sure we don't set
726 : : * es_snapshot differently in the child.
727 : : */
31 heikki.linnakangas@i 728 [ - + ]:GNC 332 : Assert(GetActiveSnapshot() == estate->es_snapshot);
729 : :
730 : : /* Everyone's had a chance to ask for space, so now create the DSM. */
3121 rhaas@postgresql.org 731 :CBC 332 : InitializeParallelDSM(pcxt);
732 : :
733 : : /*
734 : : * OK, now we have a dynamic shared memory segment, and it should be big
735 : : * enough to store all of the data we estimated we would want to put into
736 : : * it, plus whatever general stuff (not specifically executor-related) the
737 : : * ParallelContext itself needs to store there. None of the space we
738 : : * asked for has been allocated or initialized yet, though, so do that.
739 : : */
740 : :
741 : : /* Store fixed-size state. */
2420 742 : 332 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
743 : 332 : fpes->tuples_needed = tuples_needed;
2341 744 : 332 : fpes->param_exec = InvalidDsaPointer;
2337 745 : 332 : fpes->eflags = estate->es_top_eflags;
2215 andres@anarazel.de 746 : 332 : fpes->jit_flags = estate->es_jit_flags;
2420 rhaas@postgresql.org 747 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
748 : :
749 : : /* Store query string */
2307 750 : 332 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
1095 tgl@sss.pgh.pa.us 751 : 332 : memcpy(query_string, estate->es_sourceText, query_len + 1);
2608 rhaas@postgresql.org 752 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
753 : :
754 : : /* Store serialized PlannedStmt. */
3121 755 : 332 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
756 : 332 : memcpy(pstmt_space, pstmt_data, pstmt_len);
757 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
758 : :
759 : : /* Store serialized ParamListInfo. */
2341 760 : 332 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
761 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
762 : 332 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
763 : :
764 : : /* Allocate space for each worker's BufferUsage; no need to initialize. */
3121 765 : 332 : bufusage_space = shm_toc_allocate(pcxt->toc,
2489 tgl@sss.pgh.pa.us 766 : 332 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
3121 rhaas@postgresql.org 767 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
768 : 332 : pei->buffer_usage = bufusage_space;
769 : :
770 : : /* Same for WalUsage. */
1471 akapila@postgresql.o 771 : 332 : walusage_space = shm_toc_allocate(pcxt->toc,
772 : 332 : mul_size(sizeof(WalUsage), pcxt->nworkers));
773 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
774 : 332 : pei->wal_usage = walusage_space;
775 : :
776 : : /* Set up the tuple queues that the workers will write into. */
3089 rhaas@postgresql.org 777 : 332 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
778 : :
779 : : /* We don't need the TupleQueueReaders yet, though. */
2417 tgl@sss.pgh.pa.us 780 : 332 : pei->reader = NULL;
781 : :
782 : : /*
783 : : * If instrumentation options were supplied, allocate space for the data.
784 : : * It only gets partially initialized here; the rest happens during
785 : : * ExecParallelInitializeDSM.
786 : : */
3121 rhaas@postgresql.org 787 [ + + ]: 332 : if (estate->es_instrument)
788 : : {
789 : : Instrumentation *instrument;
790 : : int i;
791 : :
792 : 90 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
793 : 90 : instrumentation->instrument_options = estate->es_instrument;
3049 794 : 90 : instrumentation->instrument_offset = instrument_offset;
795 : 90 : instrumentation->num_workers = nworkers;
796 : 90 : instrumentation->num_plan_nodes = e.nnodes;
797 : 90 : instrument = GetInstrumentationArray(instrumentation);
798 [ + + ]: 933 : for (i = 0; i < nworkers * e.nnodes; ++i)
799 : 843 : InstrInit(&instrument[i], estate->es_instrument);
3121 800 : 90 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
801 : : instrumentation);
802 : 90 : pei->instrumentation = instrumentation;
803 : :
2028 andres@anarazel.de 804 [ + + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
805 : : {
806 : 12 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
807 : : jit_instrumentation_len);
808 : 12 : jit_instrumentation->num_workers = nworkers;
809 : 12 : memset(jit_instrumentation->jit_instr, 0,
810 : : sizeof(JitInstrumentation) * nworkers);
811 : 12 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
812 : : jit_instrumentation);
813 : 12 : pei->jit_instrumentation = jit_instrumentation;
814 : : }
815 : : }
816 : :
817 : : /*
818 : : * Create a DSA area that can be used by the leader and all workers.
819 : : * (However, if we failed to create a DSM and are using private memory
820 : : * instead, then skip this.)
821 : : */
2673 rhaas@postgresql.org 822 [ + - ]: 332 : if (pcxt->seg != NULL)
823 : : {
824 : : char *area_space;
825 : :
826 : 332 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
827 : 332 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
828 : 332 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
829 : : LWTRANCHE_PARALLEL_QUERY_DSA,
830 : : pcxt->seg);
831 : :
832 : : /*
833 : : * Serialize parameters, if any, using DSA storage. We don't dare use
834 : : * the main parallel query DSM for this because we might relaunch
835 : : * workers after the values have changed (and thus the amount of
836 : : * storage required has changed).
837 : : */
2341 838 [ + + ]: 332 : if (!bms_is_empty(sendParams))
839 : : {
2309 840 : 12 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
841 : : pei->area);
2341 842 : 12 : fpes->param_exec = pei->param_exec;
843 : : }
844 : : }
845 : :
846 : : /*
847 : : * Give parallel-aware nodes a chance to initialize their shared data.
848 : : * This also initializes the elements of instrumentation->ps_instrument,
849 : : * if it exists.
850 : : */
3121 851 : 332 : d.pcxt = pcxt;
852 : 332 : d.instrumentation = instrumentation;
853 : 332 : d.nnodes = 0;
854 : :
855 : : /* Install our DSA area while initializing the plan. */
2309 856 : 332 : estate->es_query_dsa = pei->area;
3121 857 : 332 : ExecParallelInitializeDSM(planstate, &d);
2309 858 : 332 : estate->es_query_dsa = NULL;
859 : :
860 : : /*
861 : : * Make sure that the world hasn't shifted under our feet. This could
862 : : * probably just be an Assert(), but let's be conservative for now.
863 : : */
3121 864 [ - + ]: 332 : if (e.nnodes != d.nnodes)
3121 rhaas@postgresql.org 865 [ # # ]:UBC 0 : elog(ERROR, "inconsistent count of PlanState nodes");
866 : :
867 : : /* OK, we're ready to rock and roll. */
3121 rhaas@postgresql.org 868 :CBC 332 : return pei;
869 : : }
870 : :
871 : : /*
872 : : * Set up tuple queue readers to read the results of a parallel subplan.
873 : : *
874 : : * This is separate from ExecInitParallelPlan() because we can launch the
875 : : * worker processes and let them start doing something before we do this.
876 : : */
877 : : void
2404 andres@anarazel.de 878 : 452 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
879 : : {
2417 tgl@sss.pgh.pa.us 880 : 452 : int nworkers = pei->pcxt->nworkers_launched;
881 : : int i;
882 : :
883 [ - + ]: 452 : Assert(pei->reader == NULL);
884 : :
885 [ + - ]: 452 : if (nworkers > 0)
886 : : {
887 : 452 : pei->reader = (TupleQueueReader **)
888 : 452 : palloc(nworkers * sizeof(TupleQueueReader *));
889 : :
890 [ + + ]: 1684 : for (i = 0; i < nworkers; i++)
891 : : {
892 : 1232 : shm_mq_set_handle(pei->tqueue[i],
893 : 1232 : pei->pcxt->worker[i].bgwhandle);
2404 andres@anarazel.de 894 : 1232 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
895 : : }
896 : : }
2417 tgl@sss.pgh.pa.us 897 : 452 : }
898 : :
899 : : /*
900 : : * Re-initialize the parallel executor shared memory state before launching
901 : : * a fresh batch of workers.
902 : : */
903 : : void
2419 904 : 129 : ExecParallelReinitialize(PlanState *planstate,
905 : : ParallelExecutorInfo *pei,
906 : : Bitmapset *sendParams)
907 : : {
2341 rhaas@postgresql.org 908 : 129 : EState *estate = planstate->state;
909 : : FixedParallelExecutorState *fpes;
910 : :
911 : : /* Old workers must already be shut down */
2419 tgl@sss.pgh.pa.us 912 [ - + ]: 129 : Assert(pei->finished);
913 : :
914 : : /*
915 : : * Force any initplan outputs that we're going to pass to workers to be
916 : : * evaluated, if they weren't already (see comments in
917 : : * ExecInitParallelPlan).
918 : : */
2038 919 [ + - ]: 129 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
920 : :
2419 921 : 129 : ReinitializeParallelDSM(pei->pcxt);
922 : 129 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
2417 923 : 129 : pei->reader = NULL;
2419 924 : 129 : pei->finished = false;
925 : :
2341 rhaas@postgresql.org 926 : 129 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
927 : :
928 : : /* Free any serialized parameters from the last round. */
929 [ - + ]: 129 : if (DsaPointerIsValid(fpes->param_exec))
930 : : {
2309 rhaas@postgresql.org 931 :UBC 0 : dsa_free(pei->area, fpes->param_exec);
2341 932 : 0 : fpes->param_exec = InvalidDsaPointer;
933 : : }
934 : :
935 : : /* Serialize current parameter values if required. */
2341 rhaas@postgresql.org 936 [ - + ]:CBC 129 : if (!bms_is_empty(sendParams))
937 : : {
2309 rhaas@postgresql.org 938 :UBC 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
939 : : pei->area);
2341 940 : 0 : fpes->param_exec = pei->param_exec;
941 : : }
942 : :
943 : : /* Traverse plan tree and let each child node reset associated state. */
2309 rhaas@postgresql.org 944 :CBC 129 : estate->es_query_dsa = pei->area;
2419 tgl@sss.pgh.pa.us 945 : 129 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
2309 rhaas@postgresql.org 946 : 129 : estate->es_query_dsa = NULL;
2419 tgl@sss.pgh.pa.us 947 : 129 : }
948 : :
949 : : /*
950 : : * Traverse plan tree to reinitialize per-node dynamic shared memory state
951 : : */
952 : : static bool
953 : 333 : ExecParallelReInitializeDSM(PlanState *planstate,
954 : : ParallelContext *pcxt)
955 : : {
956 [ - + ]: 333 : if (planstate == NULL)
2419 tgl@sss.pgh.pa.us 957 :UBC 0 : return false;
958 : :
959 : : /*
960 : : * Call reinitializers for DSM-using plan nodes.
961 : : */
2419 tgl@sss.pgh.pa.us 962 [ + + + - :CBC 333 : switch (nodeTag(planstate))
- - + + +
+ ]
963 : : {
964 : 138 : case T_SeqScanState:
965 [ + + ]: 138 : if (planstate->plan->parallel_aware)
966 : 114 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
967 : : pcxt);
968 : 138 : break;
969 : 6 : case T_IndexScanState:
970 [ + - ]: 6 : if (planstate->plan->parallel_aware)
971 : 6 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
972 : : pcxt);
973 : 6 : break;
974 : 6 : case T_IndexOnlyScanState:
975 [ + - ]: 6 : if (planstate->plan->parallel_aware)
976 : 6 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
977 : : pcxt);
978 : 6 : break;
2419 tgl@sss.pgh.pa.us 979 :UBC 0 : case T_ForeignScanState:
980 [ # # ]: 0 : if (planstate->plan->parallel_aware)
981 : 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
982 : : pcxt);
983 : 0 : break;
2322 rhaas@postgresql.org 984 : 0 : case T_AppendState:
985 [ # # ]: 0 : if (planstate->plan->parallel_aware)
986 : 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
987 : 0 : break;
2419 tgl@sss.pgh.pa.us 988 : 0 : case T_CustomScanState:
989 [ # # ]: 0 : if (planstate->plan->parallel_aware)
990 : 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
991 : : pcxt);
992 : 0 : break;
2419 tgl@sss.pgh.pa.us 993 :CBC 27 : case T_BitmapHeapScanState:
994 [ + - ]: 27 : if (planstate->plan->parallel_aware)
995 : 27 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
996 : : pcxt);
997 : 27 : break;
2307 andres@anarazel.de 998 : 48 : case T_HashJoinState:
999 [ + + ]: 48 : if (planstate->plan->parallel_aware)
1000 : 24 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1001 : : pcxt);
1002 : 48 : break;
2322 1003 : 63 : case T_HashState:
1004 : : case T_SortState:
1005 : : case T_IncrementalSortState:
1006 : : case T_MemoizeState:
1007 : : /* these nodes have DSM state, but no reinitialization is required */
2419 tgl@sss.pgh.pa.us 1008 : 63 : break;
1009 : :
1010 : 45 : default:
1011 : 45 : break;
1012 : : }
1013 : :
1014 : 333 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1015 : : }
1016 : :
1017 : : /*
1018 : : * Copy instrumentation information about this node and its descendants from
1019 : : * dynamic shared memory.
1020 : : */
1021 : : static bool
3121 rhaas@postgresql.org 1022 : 516 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1023 : : SharedExecutorInstrumentation *instrumentation)
1024 : : {
1025 : : Instrumentation *instrument;
1026 : : int i;
1027 : : int n;
1028 : : int ibytes;
2866 1029 : 516 : int plan_node_id = planstate->plan->plan_node_id;
1030 : : MemoryContext oldcontext;
1031 : :
1032 : : /* Find the instrumentation for this node. */
3049 1033 [ + - ]: 2334 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1034 [ + + ]: 2334 : if (instrumentation->plan_node_id[i] == plan_node_id)
3121 1035 : 516 : break;
3049 1036 [ - + ]: 516 : if (i >= instrumentation->num_plan_nodes)
3121 rhaas@postgresql.org 1037 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1038 : :
1039 : : /* Accumulate the statistics from all workers. */
3049 rhaas@postgresql.org 1040 :CBC 516 : instrument = GetInstrumentationArray(instrumentation);
1041 : 516 : instrument += i * instrumentation->num_workers;
1042 [ + + ]: 1359 : for (n = 0; n < instrumentation->num_workers; ++n)
1043 : 843 : InstrAggNode(planstate->instrument, &instrument[n]);
1044 : :
1045 : : /*
1046 : : * Also store the per-worker detail.
1047 : : *
1048 : : * Worker instrumentation should be allocated in the same context as the
1049 : : * regular instrumentation information, which is the per-query context.
1050 : : * Switch into per-query memory context.
1051 : : */
2314 1052 : 516 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1053 : 516 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1054 : 516 : planstate->worker_instrument =
1055 : 516 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1056 : 516 : MemoryContextSwitchTo(oldcontext);
1057 : :
3049 1058 : 516 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
2314 1059 : 516 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1060 : :
1061 : : /* Perform any node-type-specific work that needs to be done. */
2322 andres@anarazel.de 1062 [ + - + + : 516 : switch (nodeTag(planstate))
- + ]
1063 : : {
1064 : 6 : case T_SortState:
1065 : 6 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1066 : 6 : break;
1469 tomas.vondra@postgre 1067 :UBC 0 : case T_IncrementalSortState:
1068 : 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1069 : 0 : break;
2322 andres@anarazel.de 1070 :CBC 42 : case T_HashState:
1071 : 42 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1072 : 42 : break;
1395 drowley@postgresql.o 1073 : 54 : case T_AggState:
1074 : 54 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1075 : 54 : break;
1005 drowley@postgresql.o 1076 :UBC 0 : case T_MemoizeState:
1077 : 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1108 1078 : 0 : break;
2322 andres@anarazel.de 1079 :CBC 414 : default:
1080 : 414 : break;
1081 : : }
1082 : :
3121 rhaas@postgresql.org 1083 : 516 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1084 : : instrumentation);
1085 : : }
1086 : :
1087 : : /*
1088 : : * Add up the workers' JIT instrumentation from dynamic shared memory.
1089 : : */
1090 : : static void
2028 andres@anarazel.de 1091 : 12 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1092 : : SharedJitInstrumentation *shared_jit)
1093 : : {
1094 : : JitInstrumentation *combined;
1095 : : int ibytes;
1096 : :
1097 : : int n;
1098 : :
1099 : : /*
1100 : : * Accumulate worker JIT instrumentation into the combined JIT
1101 : : * instrumentation, allocating it if required.
1102 : : */
2020 1103 [ + - ]: 12 : if (!planstate->state->es_jit_worker_instr)
1104 : 12 : planstate->state->es_jit_worker_instr =
2028 1105 : 12 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
2020 1106 : 12 : combined = planstate->state->es_jit_worker_instr;
1107 : :
1108 : : /* Accumulate all the workers' instrumentations. */
2028 1109 [ + + ]: 36 : for (n = 0; n < shared_jit->num_workers; ++n)
1110 : 24 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1111 : :
1112 : : /*
1113 : : * Store the per-worker detail.
1114 : : *
1115 : : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1116 : : * instrumentation in per-query context.
1117 : : */
1118 : 12 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1789 tgl@sss.pgh.pa.us 1119 : 12 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
2028 andres@anarazel.de 1120 : 12 : planstate->worker_jit_instrument =
1121 : 12 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1122 : :
1123 : 12 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1124 : 12 : }
1125 : :
1126 : : /*
1127 : : * Finish parallel execution. We wait for parallel workers to finish, and
1128 : : * accumulate their buffer/WAL usage.
1129 : : */
1130 : : void
3121 rhaas@postgresql.org 1131 : 835 : ExecParallelFinish(ParallelExecutorInfo *pei)
1132 : : {
2417 tgl@sss.pgh.pa.us 1133 : 835 : int nworkers = pei->pcxt->nworkers_launched;
1134 : : int i;
1135 : :
1136 : : /* Make this be a no-op if called twice in a row. */
3070 rhaas@postgresql.org 1137 [ + + ]: 835 : if (pei->finished)
1138 : 377 : return;
1139 : :
1140 : : /*
1141 : : * Detach from tuple queues ASAP, so that any still-active workers will
1142 : : * notice that no further results are wanted.
1143 : : */
2417 tgl@sss.pgh.pa.us 1144 [ + - ]: 458 : if (pei->tqueue != NULL)
1145 : : {
1146 [ + + ]: 1687 : for (i = 0; i < nworkers; i++)
1147 : 1229 : shm_mq_detach(pei->tqueue[i]);
1148 : 458 : pfree(pei->tqueue);
1149 : 458 : pei->tqueue = NULL;
1150 : : }
1151 : :
1152 : : /*
1153 : : * While we're waiting for the workers to finish, let's get rid of the
1154 : : * tuple queue readers. (Any other local cleanup could be done here too.)
1155 : : */
1156 [ + + ]: 458 : if (pei->reader != NULL)
1157 : : {
1158 [ + + ]: 1678 : for (i = 0; i < nworkers; i++)
1159 : 1229 : DestroyTupleQueueReader(pei->reader[i]);
1160 : 449 : pfree(pei->reader);
1161 : 449 : pei->reader = NULL;
1162 : : }
1163 : :
1164 : : /* Now wait for the workers to finish. */
3121 rhaas@postgresql.org 1165 : 458 : WaitForParallelWorkersToFinish(pei->pcxt);
1166 : :
1167 : : /*
1168 : : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1169 : : * finish, or we might get incomplete data.)
1170 : : */
2417 tgl@sss.pgh.pa.us 1171 [ + + ]: 1687 : for (i = 0; i < nworkers; i++)
1471 akapila@postgresql.o 1172 : 1229 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1173 : :
3070 rhaas@postgresql.org 1174 : 458 : pei->finished = true;
1175 : : }
1176 : :
1177 : : /*
1178 : : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1179 : : * resources still exist after ExecParallelFinish. We separate these
1180 : : * routines because someone might want to examine the contents of the DSM
1181 : : * after ExecParallelFinish and before calling this routine.
1182 : : */
1183 : : void
3103 1184 : 329 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1185 : : {
1186 : : /* Accumulate instrumentation, if any. */
2308 1187 [ + + ]: 329 : if (pei->instrumentation)
1188 : 90 : ExecParallelRetrieveInstrumentation(pei->planstate,
1189 : : pei->instrumentation);
1190 : :
1191 : : /* Accumulate JIT instrumentation, if any. */
2028 andres@anarazel.de 1192 [ + + ]: 329 : if (pei->jit_instrumentation)
1193 : 12 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1789 tgl@sss.pgh.pa.us 1194 : 12 : pei->jit_instrumentation);
1195 : :
1196 : : /* Free any serialized parameters. */
2341 rhaas@postgresql.org 1197 [ + + ]: 329 : if (DsaPointerIsValid(pei->param_exec))
1198 : : {
1199 : 12 : dsa_free(pei->area, pei->param_exec);
1200 : 12 : pei->param_exec = InvalidDsaPointer;
1201 : : }
2673 1202 [ + - ]: 329 : if (pei->area != NULL)
1203 : : {
1204 : 329 : dsa_detach(pei->area);
1205 : 329 : pei->area = NULL;
1206 : : }
3103 1207 [ + - ]: 329 : if (pei->pcxt != NULL)
1208 : : {
1209 : 329 : DestroyParallelContext(pei->pcxt);
1210 : 329 : pei->pcxt = NULL;
1211 : : }
1212 : 329 : pfree(pei);
1213 : 329 : }
1214 : :
1215 : : /*
1216 : : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1217 : : * for that purpose.
1218 : : */
1219 : : static DestReceiver *
3121 1220 : 1232 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1221 : : {
1222 : : char *mqspace;
1223 : : shm_mq *mq;
1224 : :
2505 tgl@sss.pgh.pa.us 1225 : 1232 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
3121 rhaas@postgresql.org 1226 : 1232 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1227 : 1232 : mq = (shm_mq *) mqspace;
1228 : 1232 : shm_mq_set_sender(mq, MyProc);
1229 : 1232 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1230 : : }
1231 : :
1232 : : /*
1233 : : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1234 : : */
1235 : : static QueryDesc *
1236 : 1232 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1237 : : int instrument_options)
1238 : : {
1239 : : char *pstmtspace;
1240 : : char *paramspace;
1241 : : PlannedStmt *pstmt;
1242 : : ParamListInfo paramLI;
1243 : : char *queryString;
1244 : :
1245 : : /* Get the query string from shared memory */
2505 tgl@sss.pgh.pa.us 1246 : 1232 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1247 : :
1248 : : /* Reconstruct leader-supplied PlannedStmt. */
1249 : 1232 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
3121 rhaas@postgresql.org 1250 : 1232 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1251 : :
1252 : : /* Reconstruct ParamListInfo. */
2341 1253 : 1232 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
3121 1254 : 1232 : paramLI = RestoreParamList(¶mspace);
1255 : :
1256 : : /* Create a QueryDesc for the query. */
1257 : 1232 : return CreateQueryDesc(pstmt,
1258 : : queryString,
1259 : : GetActiveSnapshot(), InvalidSnapshot,
1260 : : receiver, paramLI, NULL, instrument_options);
1261 : : }
1262 : :
1263 : : /*
1264 : : * Copy instrumentation information from this node and its descendants into
1265 : : * dynamic shared memory, so that the parallel leader can retrieve it.
1266 : : */
1267 : : static bool
1268 : 1191 : ExecParallelReportInstrumentation(PlanState *planstate,
1269 : : SharedExecutorInstrumentation *instrumentation)
1270 : : {
1271 : : int i;
2866 1272 : 1191 : int plan_node_id = planstate->plan->plan_node_id;
1273 : : Instrumentation *instrument;
1274 : :
3049 1275 : 1191 : InstrEndLoop(planstate->instrument);
1276 : :
1277 : : /*
1278 : : * If we shuffled the plan_node_id values in ps_instrument into sorted
1279 : : * order, we could use binary search here. This might matter someday if
1280 : : * we're pushing down sufficiently large plan trees. For now, do it the
1281 : : * slow, dumb way.
1282 : : */
1283 [ + - ]: 3921 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1284 [ + + ]: 3921 : if (instrumentation->plan_node_id[i] == plan_node_id)
3121 1285 : 1191 : break;
3049 1286 [ - + ]: 1191 : if (i >= instrumentation->num_plan_nodes)
3121 rhaas@postgresql.org 1287 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1288 : :
1289 : : /*
1290 : : * Add our statistics to the per-node, per-worker totals. It's possible
1291 : : * that this could happen more than once if we relaunched workers.
1292 : : */
3049 rhaas@postgresql.org 1293 :CBC 1191 : instrument = GetInstrumentationArray(instrumentation);
1294 : 1191 : instrument += i * instrumentation->num_workers;
1295 [ - + ]: 1191 : Assert(IsParallelWorker());
1296 [ - + ]: 1191 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1297 : 1191 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1298 : :
3121 1299 : 1191 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1300 : : instrumentation);
1301 : : }
1302 : :
1303 : : /*
1304 : : * Initialize the PlanState and its descendants with the information
1305 : : * retrieved from shared memory. This has to be done once the PlanState
1306 : : * is allocated and initialized by executor; that is, after ExecutorStart().
1307 : : */
1308 : : static bool
2341 andres@anarazel.de 1309 : 4077 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1310 : : {
3077 rhaas@postgresql.org 1311 [ - + ]: 4077 : if (planstate == NULL)
3077 rhaas@postgresql.org 1312 :UBC 0 : return false;
1313 : :
2420 rhaas@postgresql.org 1314 [ + + + - :CBC 4077 : switch (nodeTag(planstate))
+ - + + +
+ - + +
+ ]
1315 : : {
1316 : 1647 : case T_SeqScanState:
1317 [ + + ]: 1647 : if (planstate->plan->parallel_aware)
2341 andres@anarazel.de 1318 : 1333 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
2420 rhaas@postgresql.org 1319 : 1647 : break;
1320 : 210 : case T_IndexScanState:
1321 [ + + ]: 210 : if (planstate->plan->parallel_aware)
2341 andres@anarazel.de 1322 : 60 : ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1323 : : pwcxt);
2420 rhaas@postgresql.org 1324 : 210 : break;
1325 : 118 : case T_IndexOnlyScanState:
1326 [ + + ]: 118 : if (planstate->plan->parallel_aware)
2341 andres@anarazel.de 1327 : 100 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1328 : : pwcxt);
2420 rhaas@postgresql.org 1329 : 118 : break;
2420 rhaas@postgresql.org 1330 :UBC 0 : case T_ForeignScanState:
1331 [ # # ]: 0 : if (planstate->plan->parallel_aware)
2993 1332 : 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1333 : : pwcxt);
2420 1334 : 0 : break;
2322 rhaas@postgresql.org 1335 :CBC 189 : case T_AppendState:
1336 [ + + ]: 189 : if (planstate->plan->parallel_aware)
1337 : 159 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1338 : 189 : break;
2420 rhaas@postgresql.org 1339 :UBC 0 : case T_CustomScanState:
1340 [ # # ]: 0 : if (planstate->plan->parallel_aware)
2993 1341 : 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1342 : : pwcxt);
2420 1343 : 0 : break;
2420 rhaas@postgresql.org 1344 :CBC 139 : case T_BitmapHeapScanState:
1345 [ + + ]: 139 : if (planstate->plan->parallel_aware)
2341 andres@anarazel.de 1346 : 138 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1347 : : pwcxt);
2420 rhaas@postgresql.org 1348 : 139 : break;
2307 andres@anarazel.de 1349 : 273 : case T_HashJoinState:
1350 [ + + ]: 273 : if (planstate->plan->parallel_aware)
1351 : 153 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1352 : : pwcxt);
1353 : 273 : break;
2322 1354 : 273 : case T_HashState:
1355 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1356 : 273 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1357 : 273 : break;
2420 rhaas@postgresql.org 1358 : 226 : case T_SortState:
1359 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
2341 andres@anarazel.de 1360 : 226 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
2419 tgl@sss.pgh.pa.us 1361 : 226 : break;
1469 tomas.vondra@postgre 1362 :UBC 0 : case T_IncrementalSortState:
1363 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1364 : 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1365 : : pwcxt);
1366 : 0 : break;
1395 drowley@postgresql.o 1367 :CBC 780 : case T_AggState:
1368 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1369 : 780 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1370 : 780 : break;
1005 1371 : 6 : case T_MemoizeState:
1372 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1373 : 6 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1108 1374 : 6 : break;
2420 rhaas@postgresql.org 1375 : 216 : default:
1376 : 216 : break;
1377 : : }
1378 : :
2341 andres@anarazel.de 1379 : 4077 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1380 : : pwcxt);
1381 : : }
1382 : :
1383 : : /*
1384 : : * Main entrypoint for parallel query worker processes.
1385 : : *
1386 : : * We reach this function from ParallelWorkerMain, so the setup necessary to
1387 : : * create a sensible parallel environment has already been done;
1388 : : * ParallelWorkerMain worries about stuff like the transaction state, combo
1389 : : * CID mappings, and GUC values, so we don't need to deal with any of that
1390 : : * here.
1391 : : *
1392 : : * Our job is to deal with concerns specific to the executor. The parallel
1393 : : * group leader will have stored a serialized PlannedStmt, and it's our job
1394 : : * to execute that plan and write the resulting tuples to the appropriate
1395 : : * tuple queue. Various bits of supporting information that we need in order
1396 : : * to do this are also stored in the dsm_segment and can be accessed through
1397 : : * the shm_toc.
1398 : : */
1399 : : void
3121 rhaas@postgresql.org 1400 : 1232 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1401 : : {
1402 : : FixedParallelExecutorState *fpes;
1403 : : BufferUsage *buffer_usage;
1404 : : WalUsage *wal_usage;
1405 : : DestReceiver *receiver;
1406 : : QueryDesc *queryDesc;
1407 : : SharedExecutorInstrumentation *instrumentation;
1408 : : SharedJitInstrumentation *jit_instrumentation;
1409 : 1232 : int instrument_options = 0;
1410 : : void *area_space;
1411 : : dsa_area *area;
1412 : : ParallelWorkerContext pwcxt;
1413 : :
1414 : : /* Get fixed-size state. */
2420 1415 : 1232 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1416 : :
1417 : : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
3121 1418 : 1232 : receiver = ExecParallelGetReceiver(seg, toc);
2505 tgl@sss.pgh.pa.us 1419 : 1232 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
3121 rhaas@postgresql.org 1420 [ + + ]: 1232 : if (instrumentation != NULL)
1421 : 363 : instrument_options = instrumentation->instrument_options;
2028 andres@anarazel.de 1422 : 1232 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1423 : : true);
3121 rhaas@postgresql.org 1424 : 1232 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1425 : :
1426 : : /* Setting debug_query_string for individual workers */
2608 1427 : 1232 : debug_query_string = queryDesc->sourceText;
1428 : :
1429 : : /* Report workers' query for monitoring purposes */
1430 : 1232 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1431 : :
1432 : : /* Attach to the dynamic shared memory area. */
2505 tgl@sss.pgh.pa.us 1433 : 1232 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
2673 rhaas@postgresql.org 1434 : 1232 : area = dsa_attach_in_place(area_space, seg);
1435 : :
1436 : : /* Start up the executor */
2215 andres@anarazel.de 1437 : 1232 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
2337 rhaas@postgresql.org 1438 : 1232 : ExecutorStart(queryDesc, fpes->eflags);
1439 : :
1440 : : /* Special executor initialization steps for parallel workers */
2673 1441 : 1232 : queryDesc->planstate->state->es_query_dsa = area;
2341 1442 [ + + ]: 1232 : if (DsaPointerIsValid(fpes->param_exec))
1443 : : {
1444 : : char *paramexec_space;
1445 : :
1446 : 36 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1447 : 36 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1448 : : }
andres@anarazel.de 1449 : 1232 : pwcxt.toc = toc;
1450 : 1232 : pwcxt.seg = seg;
1451 : 1232 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1452 : :
1453 : : /* Pass down any tuple bound */
2420 rhaas@postgresql.org 1454 : 1232 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1455 : :
1456 : : /*
1457 : : * Prepare to track buffer/WAL usage during query execution.
1458 : : *
1459 : : * We do this after starting up the executor to match what happens in the
1460 : : * leader, which also doesn't count buffer accesses and WAL activity that
1461 : : * occur during executor startup.
1462 : : */
2081 akapila@postgresql.o 1463 : 1232 : InstrStartParallelQuery();
1464 : :
1465 : : /*
1466 : : * Run the plan. If we specified a tuple bound, be careful not to demand
1467 : : * more tuples than that.
1468 : : */
2420 rhaas@postgresql.org 1469 : 1232 : ExecutorRun(queryDesc,
1470 : : ForwardScanDirection,
1471 : 1232 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1472 : : true);
1473 : :
1474 : : /* Shut down the executor */
3121 1475 : 1229 : ExecutorFinish(queryDesc);
1476 : :
1477 : : /* Report buffer/WAL usage during parallel execution. */
2505 tgl@sss.pgh.pa.us 1478 : 1229 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1471 akapila@postgresql.o 1479 : 1229 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1480 : 1229 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1481 : 1229 : &wal_usage[ParallelWorkerNumber]);
1482 : :
1483 : : /* Report instrumentation data if any instrumentation options are set. */
3121 rhaas@postgresql.org 1484 [ + + ]: 1229 : if (instrumentation != NULL)
1485 : 363 : ExecParallelReportInstrumentation(queryDesc->planstate,
1486 : : instrumentation);
1487 : :
1488 : : /* Report JIT instrumentation data if any */
2028 andres@anarazel.de 1489 [ - + - - ]: 1229 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1490 : : {
2028 andres@anarazel.de 1491 [ # # ]:UBC 0 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1492 : 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1493 : 0 : queryDesc->estate->es_jit->instr;
1494 : : }
1495 : :
1496 : : /* Must do this after capturing instrumentation. */
3119 rhaas@postgresql.org 1497 :CBC 1229 : ExecutorEnd(queryDesc);
1498 : :
1499 : : /* Cleanup. */
2673 1500 : 1229 : dsa_detach(area);
3121 1501 : 1229 : FreeQueryDesc(queryDesc);
2411 peter_e@gmx.net 1502 : 1229 : receiver->rDestroy(receiver);
3121 rhaas@postgresql.org 1503 : 1229 : }
|