Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeCustom.h"
32 : #include "executor/nodeForeignscan.h"
33 : #include "executor/nodeHash.h"
34 : #include "executor/nodeHashjoin.h"
35 : #include "executor/nodeIncrementalSort.h"
36 : #include "executor/nodeIndexonlyscan.h"
37 : #include "executor/nodeIndexscan.h"
38 : #include "executor/nodeMemoize.h"
39 : #include "executor/nodeSeqscan.h"
40 : #include "executor/nodeSort.h"
41 : #include "executor/nodeSubplan.h"
42 : #include "executor/tqueue.h"
43 : #include "jit/jit.h"
44 : #include "nodes/nodeFuncs.h"
45 : #include "pgstat.h"
46 : #include "storage/spin.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/datum.h"
49 : #include "utils/dsa.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/snapmgr.h"
53 :
54 : /*
55 : * Magic numbers for parallel executor communication. We use constants
56 : * greater than any 32-bit integer here so that values < 2^32 can be used
57 : * by individual parallel nodes to store their own state.
58 : */
59 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
69 :
70 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
71 :
72 : /*
73 : * Fixed-size random stuff that we need to pass to parallel workers.
74 : */
75 : typedef struct FixedParallelExecutorState
76 : {
77 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 : dsa_pointer param_exec;
79 : int eflags;
80 : int jit_flags;
81 : } FixedParallelExecutorState;
82 :
83 : /*
84 : * DSM structure for accumulating per-PlanState instrumentation.
85 : *
86 : * instrument_options: Same meaning here as in instrument.c.
87 : *
88 : * instrument_offset: Offset, relative to the start of this structure,
89 : * of the first Instrumentation object. This will depend on the length of
90 : * the plan_node_id array.
91 : *
92 : * num_workers: Number of workers.
93 : *
94 : * num_plan_nodes: Number of plan nodes.
95 : *
96 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97 : * from parallel workers. The length of this array is given by num_plan_nodes.
98 : */
99 : struct SharedExecutorInstrumentation
100 : {
101 : int instrument_options;
102 : int instrument_offset;
103 : int num_workers;
104 : int num_plan_nodes;
105 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 : };
108 : #define GetInstrumentationArray(sei) \
109 : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111 :
112 : /* Context object for ExecParallelEstimate. */
113 : typedef struct ExecParallelEstimateContext
114 : {
115 : ParallelContext *pcxt;
116 : int nnodes;
117 : } ExecParallelEstimateContext;
118 :
119 : /* Context object for ExecParallelInitializeDSM. */
120 : typedef struct ExecParallelInitializeDSMContext
121 : {
122 : ParallelContext *pcxt;
123 : SharedExecutorInstrumentation *instrumentation;
124 : int nnodes;
125 : } ExecParallelInitializeDSMContext;
126 :
127 : /* Helper functions that run in the parallel leader. */
128 : static char *ExecSerializePlan(Plan *plan, EState *estate);
129 : static bool ExecParallelEstimate(PlanState *planstate,
130 : ExecParallelEstimateContext *e);
131 : static bool ExecParallelInitializeDSM(PlanState *planstate,
132 : ExecParallelInitializeDSMContext *d);
133 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 : bool reinitialize);
135 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 : ParallelContext *pcxt);
137 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 : SharedExecutorInstrumentation *instrumentation);
139 :
140 : /* Helper function that runs in the parallel worker. */
141 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142 :
143 : /*
144 : * Create a serialized representation of the plan to be sent to each worker.
145 : */
146 : static char *
2748 rhaas 147 CBC 323 : ExecSerializePlan(Plan *plan, EState *estate)
148 : {
149 : PlannedStmt *pstmt;
150 : ListCell *lc;
151 :
152 : /* We can't scribble on the original plan, so make a copy. */
2750 153 323 : plan = copyObject(plan);
154 :
155 : /*
156 : * The worker will start its own copy of the executor, and that copy will
157 : * insert a junk filter if the toplevel node has any resjunk entries. We
158 : * don't want that to happen, because while resjunk columns shouldn't be
159 : * sent back to the user, here the tuples are coming back to another
160 : * backend which may very well need them. So mutate the target list
161 : * accordingly. This is sort of a hack; there might be better ways to do
162 : * this...
163 : */
2188 tgl 164 884 : foreach(lc, plan->targetlist)
165 : {
166 561 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
167 :
2750 rhaas 168 561 : tle->resjunk = false;
169 : }
170 :
171 : /*
172 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
173 : * for our purposes, but the worker will need at least a minimal
174 : * PlannedStmt to start the executor.
175 : */
176 323 : pstmt = makeNode(PlannedStmt);
177 323 : pstmt->commandType = CMD_SELECT;
719 bruce 178 323 : pstmt->queryId = pgstat_get_my_query_id();
2459 tgl 179 323 : pstmt->hasReturning = false;
180 323 : pstmt->hasModifyingCTE = false;
181 323 : pstmt->canSetTag = true;
182 323 : pstmt->transientPlan = false;
183 323 : pstmt->dependsOnRole = false;
184 323 : pstmt->parallelModeNeeded = false;
2750 rhaas 185 323 : pstmt->planTree = plan;
129 alvherre 186 GNC 323 : pstmt->partPruneInfos = estate->es_part_prune_infos;
2748 rhaas 187 CBC 323 : pstmt->rtable = estate->es_range_table;
124 alvherre 188 GNC 323 : pstmt->permInfos = estate->es_rteperminfos;
2750 rhaas 189 CBC 323 : pstmt->resultRelations = NIL;
1215 tgl 190 323 : pstmt->appendRelations = NIL;
2188 tgl 191 ECB :
192 : /*
193 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
194 : * for unsafe ones (so that the list indexes of the safe ones are
195 : * preserved). This positively ensures that the worker won't try to run,
196 : * or even do ExecInitNode on, an unsafe subplan. That's important to
197 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
198 : */
2188 tgl 199 GIC 323 : pstmt->subplans = NIL;
200 353 : foreach(lc, estate->es_plannedstmt->subplans)
2188 tgl 201 ECB : {
2188 tgl 202 CBC 30 : Plan *subplan = (Plan *) lfirst(lc);
203 :
204 30 : if (subplan && !subplan->parallel_safe)
2188 tgl 205 GIC 27 : subplan = NULL;
2188 tgl 206 CBC 30 : pstmt->subplans = lappend(pstmt->subplans, subplan);
2188 tgl 207 ECB : }
208 :
2750 rhaas 209 GIC 323 : pstmt->rewindPlanIDs = NULL;
210 323 : pstmt->rowMarks = NIL;
2750 rhaas 211 CBC 323 : pstmt->relationOids = NIL;
212 323 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
1973 213 323 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
2276 tgl 214 323 : pstmt->utilityStmt = NULL;
215 323 : pstmt->stmt_location = -1;
216 323 : pstmt->stmt_len = -1;
2750 rhaas 217 ECB :
218 : /* Return serialized copy of our dummy PlannedStmt. */
2750 rhaas 219 GIC 323 : return nodeToString(pstmt);
220 : }
2750 rhaas 221 ECB :
222 : /*
223 : * Parallel-aware plan nodes (and occasionally others) may need some state
224 : * which is shared across all parallel workers. Before we size the DSM, give
225 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
226 : * &pcxt->estimator.
227 : *
228 : * While we're at it, count the number of PlanState nodes in the tree, so
229 : * we know how many Instrumentation structures we need.
230 : */
231 : static bool
2750 rhaas 232 GIC 1417 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
233 : {
2750 rhaas 234 CBC 1417 : if (planstate == NULL)
2750 rhaas 235 UIC 0 : return false;
2750 rhaas 236 ECB :
2750 rhaas 237 EUB : /* Count this node. */
2750 rhaas 238 GIC 1417 : e->nnodes++;
239 :
2049 rhaas 240 CBC 1417 : switch (nodeTag(planstate))
241 : {
242 560 : case T_SeqScanState:
2049 rhaas 243 GIC 560 : if (planstate->plan->parallel_aware)
2636 rhaas 244 CBC 441 : ExecSeqScanEstimate((SeqScanState *) planstate,
2636 rhaas 245 ECB : e->pcxt);
2049 rhaas 246 CBC 560 : break;
2049 rhaas 247 GIC 144 : case T_IndexScanState:
2049 rhaas 248 CBC 144 : if (planstate->plan->parallel_aware)
2244 249 6 : ExecIndexScanEstimate((IndexScanState *) planstate,
2244 rhaas 250 ECB : e->pcxt);
2049 rhaas 251 CBC 144 : break;
2049 rhaas 252 GIC 26 : case T_IndexOnlyScanState:
2049 rhaas 253 CBC 26 : if (planstate->plan->parallel_aware)
2240 254 20 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
2240 rhaas 255 ECB : e->pcxt);
2049 rhaas 256 CBC 26 : break;
2049 rhaas 257 UIC 0 : case T_ForeignScanState:
2049 rhaas 258 LBC 0 : if (planstate->plan->parallel_aware)
2622 rhaas 259 UBC 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
2622 rhaas 260 EUB : e->pcxt);
2049 rhaas 261 UBC 0 : break;
1951 rhaas 262 GIC 90 : case T_AppendState:
1951 rhaas 263 GBC 90 : if (planstate->plan->parallel_aware)
1951 rhaas 264 CBC 66 : ExecAppendEstimate((AppendState *) planstate,
1951 rhaas 265 ECB : e->pcxt);
1951 rhaas 266 CBC 90 : break;
2049 rhaas 267 UIC 0 : case T_CustomScanState:
2049 rhaas 268 LBC 0 : if (planstate->plan->parallel_aware)
2622 rhaas 269 UBC 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
2622 rhaas 270 EUB : e->pcxt);
2049 rhaas 271 UBC 0 : break;
2049 rhaas 272 GIC 10 : case T_BitmapHeapScanState:
2049 rhaas 273 GBC 10 : if (planstate->plan->parallel_aware)
2223 rhaas 274 CBC 9 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
2223 rhaas 275 ECB : e->pcxt);
2049 rhaas 276 CBC 10 : break;
1936 andres 277 GIC 93 : case T_HashJoinState:
1936 andres 278 CBC 93 : if (planstate->plan->parallel_aware)
279 57 : ExecHashJoinEstimate((HashJoinState *) planstate,
1936 andres 280 ECB : e->pcxt);
1936 andres 281 CBC 93 : break;
1951 andres 282 GIC 93 : case T_HashState:
1951 andres 283 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1951 andres 284 CBC 93 : ExecHashEstimate((HashState *) planstate, e->pcxt);
1951 andres 285 GIC 93 : break;
2049 rhaas 286 CBC 70 : case T_SortState:
1951 andres 287 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
2049 rhaas 288 CBC 70 : ExecSortEstimate((SortState *) planstate, e->pcxt);
2048 tgl 289 GIC 70 : break;
1098 tomas.vondra 290 LBC 0 : case T_IncrementalSortState:
1098 tomas.vondra 291 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1098 tomas.vondra 292 UBC 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
1098 tomas.vondra 293 UIC 0 : break;
1024 drowley 294 GBC 280 : case T_AggState:
1024 drowley 295 EUB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1024 drowley 296 CBC 280 : ExecAggEstimate((AggState *) planstate, e->pcxt);
1024 drowley 297 GIC 280 : break;
634 drowley 298 CBC 3 : case T_MemoizeState:
737 drowley 299 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
634 drowley 300 CBC 3 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
737 drowley 301 GIC 3 : break;
2049 rhaas 302 CBC 48 : default:
303 48 : break;
2706 rhaas 304 ECB : }
2750 305 :
2750 rhaas 306 GIC 1417 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
307 : }
2750 rhaas 308 ECB :
309 : /*
310 : * Estimate the amount of space required to serialize the indicated parameters.
311 : */
312 : static Size
1970 rhaas 313 GIC 15 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
314 : {
1970 rhaas 315 ECB : int paramid;
1970 rhaas 316 GIC 15 : Size sz = sizeof(int);
317 :
1970 rhaas 318 CBC 15 : paramid = -1;
1970 rhaas 319 GIC 33 : while ((paramid = bms_next_member(params, paramid)) >= 0)
1970 rhaas 320 ECB : {
321 : Oid typeOid;
322 : int16 typLen;
323 : bool typByVal;
324 : ParamExecData *prm;
325 :
1970 rhaas 326 GIC 18 : prm = &(estate->es_param_exec_vals[paramid]);
327 18 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
1970 rhaas 328 ECB : paramid);
329 :
1970 rhaas 330 GIC 18 : sz = add_size(sz, sizeof(int)); /* space for paramid */
331 :
1970 rhaas 332 ECB : /* space for datum/isnull */
1970 rhaas 333 GIC 18 : if (OidIsValid(typeOid))
334 18 : get_typlenbyval(typeOid, &typLen, &typByVal);
1970 rhaas 335 ECB : else
336 : {
337 : /* If no type OID, assume by-value, like copyParamList does. */
1970 rhaas 338 UIC 0 : typLen = sizeof(Datum);
339 0 : typByVal = true;
1970 rhaas 340 EUB : }
1970 rhaas 341 GBC 18 : sz = add_size(sz,
1970 rhaas 342 GIC 18 : datumEstimateSpace(prm->value, prm->isnull,
1970 rhaas 343 ECB : typByVal, typLen));
344 : }
1970 rhaas 345 GIC 15 : return sz;
346 : }
1970 rhaas 347 ECB :
348 : /*
349 : * Serialize specified PARAM_EXEC parameters.
350 : *
351 : * We write the number of parameters first, as a 4-byte integer, and then
352 : * write details for each parameter in turn. The details for each parameter
353 : * consist of a 4-byte paramid (location of param in execution time internal
354 : * parameter array) and then the datum as serialized by datumSerialize().
355 : */
356 : static dsa_pointer
1938 rhaas 357 GIC 15 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
358 : {
1970 rhaas 359 ECB : Size size;
360 : int nparams;
361 : int paramid;
362 : ParamExecData *prm;
363 : dsa_pointer handle;
364 : char *start_address;
365 :
366 : /* Allocate enough space for the current parameter values. */
1970 rhaas 367 GIC 15 : size = EstimateParamExecSpace(estate, params);
1938 368 15 : handle = dsa_allocate(area, size);
1938 rhaas 369 CBC 15 : start_address = dsa_get_address(area, handle);
1970 rhaas 370 ECB :
371 : /* First write the number of parameters as a 4-byte integer. */
1970 rhaas 372 GIC 15 : nparams = bms_num_members(params);
373 15 : memcpy(start_address, &nparams, sizeof(int));
1970 rhaas 374 CBC 15 : start_address += sizeof(int);
1970 rhaas 375 ECB :
376 : /* Write details for each parameter in turn. */
1970 rhaas 377 GIC 15 : paramid = -1;
378 33 : while ((paramid = bms_next_member(params, paramid)) >= 0)
1970 rhaas 379 ECB : {
380 : Oid typeOid;
381 : int16 typLen;
382 : bool typByVal;
383 :
1970 rhaas 384 GIC 18 : prm = &(estate->es_param_exec_vals[paramid]);
385 18 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
1970 rhaas 386 ECB : paramid);
387 :
388 : /* Write paramid. */
1970 rhaas 389 GIC 18 : memcpy(start_address, ¶mid, sizeof(int));
390 18 : start_address += sizeof(int);
1970 rhaas 391 ECB :
392 : /* Write datum/isnull */
1970 rhaas 393 GIC 18 : if (OidIsValid(typeOid))
394 18 : get_typlenbyval(typeOid, &typLen, &typByVal);
1970 rhaas 395 ECB : else
396 : {
397 : /* If no type OID, assume by-value, like copyParamList does. */
1970 rhaas 398 UIC 0 : typLen = sizeof(Datum);
399 0 : typByVal = true;
1970 rhaas 400 EUB : }
1970 rhaas 401 GBC 18 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
402 : &start_address);
1970 rhaas 403 ECB : }
404 :
1970 rhaas 405 GIC 15 : return handle;
406 : }
1970 rhaas 407 ECB :
408 : /*
409 : * Restore specified PARAM_EXEC parameters.
410 : */
411 : static void
1970 rhaas 412 GIC 35 : RestoreParamExecParams(char *start_address, EState *estate)
413 : {
1970 rhaas 414 ECB : int nparams;
415 : int i;
416 : int paramid;
417 :
1970 rhaas 418 GIC 35 : memcpy(&nparams, start_address, sizeof(int));
419 35 : start_address += sizeof(int);
1970 rhaas 420 ECB :
1970 rhaas 421 CBC 75 : for (i = 0; i < nparams; i++)
422 : {
1970 rhaas 423 ECB : ParamExecData *prm;
424 :
425 : /* Read paramid */
1970 rhaas 426 GIC 40 : memcpy(¶mid, start_address, sizeof(int));
427 40 : start_address += sizeof(int);
1970 rhaas 428 CBC 40 : prm = &(estate->es_param_exec_vals[paramid]);
1970 rhaas 429 ECB :
430 : /* Read datum/isnull. */
1970 rhaas 431 GIC 40 : prm->value = datumRestore(&start_address, &prm->isnull);
432 40 : prm->execPlan = NULL;
1970 rhaas 433 ECB : }
1970 rhaas 434 CBC 35 : }
435 :
2750 rhaas 436 ECB : /*
437 : * Initialize the dynamic shared memory segment that will be used to control
438 : * parallel execution.
439 : */
440 : static bool
2750 rhaas 441 GIC 1417 : ExecParallelInitializeDSM(PlanState *planstate,
442 : ExecParallelInitializeDSMContext *d)
2750 rhaas 443 ECB : {
2750 rhaas 444 GIC 1417 : if (planstate == NULL)
2750 rhaas 445 UIC 0 : return false;
2750 rhaas 446 ECB :
2678 rhaas 447 EUB : /* If instrumentation is enabled, initialize slot for this node. */
2750 rhaas 448 GIC 1417 : if (d->instrumentation != NULL)
2678 449 507 : d->instrumentation->plan_node_id[d->nnodes] =
2678 rhaas 450 CBC 507 : planstate->plan->plan_node_id;
2750 rhaas 451 ECB :
452 : /* Count this node. */
2750 rhaas 453 GIC 1417 : d->nnodes++;
454 :
2636 rhaas 455 ECB : /*
456 : * Call initializers for DSM-using plan nodes.
457 : *
458 : * Most plan nodes won't do anything here, but plan nodes that allocated
459 : * DSM may need to initialize shared state in the DSM before parallel
460 : * workers are launched. They can allocate the space they previously
461 : * estimated using shm_toc_allocate, and add the keys they previously
462 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
463 : */
2049 rhaas 464 GIC 1417 : switch (nodeTag(planstate))
465 : {
2049 rhaas 466 CBC 560 : case T_SeqScanState:
2049 rhaas 467 GIC 560 : if (planstate->plan->parallel_aware)
2636 rhaas 468 CBC 441 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
2636 rhaas 469 ECB : d->pcxt);
2049 rhaas 470 CBC 560 : break;
2049 rhaas 471 GIC 144 : case T_IndexScanState:
2049 rhaas 472 CBC 144 : if (planstate->plan->parallel_aware)
2244 473 6 : ExecIndexScanInitializeDSM((IndexScanState *) planstate,
2244 rhaas 474 ECB : d->pcxt);
2049 rhaas 475 CBC 144 : break;
2049 rhaas 476 GIC 26 : case T_IndexOnlyScanState:
2049 rhaas 477 CBC 26 : if (planstate->plan->parallel_aware)
2240 478 20 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
2240 rhaas 479 ECB : d->pcxt);
2049 rhaas 480 CBC 26 : break;
2049 rhaas 481 UIC 0 : case T_ForeignScanState:
2049 rhaas 482 LBC 0 : if (planstate->plan->parallel_aware)
2622 rhaas 483 UBC 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
2622 rhaas 484 EUB : d->pcxt);
2049 rhaas 485 UBC 0 : break;
1951 rhaas 486 GIC 90 : case T_AppendState:
1951 rhaas 487 GBC 90 : if (planstate->plan->parallel_aware)
1951 rhaas 488 CBC 66 : ExecAppendInitializeDSM((AppendState *) planstate,
1951 rhaas 489 ECB : d->pcxt);
1951 rhaas 490 CBC 90 : break;
2049 rhaas 491 UIC 0 : case T_CustomScanState:
2049 rhaas 492 LBC 0 : if (planstate->plan->parallel_aware)
2622 rhaas 493 UBC 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
2622 rhaas 494 EUB : d->pcxt);
2049 rhaas 495 UBC 0 : break;
2049 rhaas 496 GIC 10 : case T_BitmapHeapScanState:
2049 rhaas 497 GBC 10 : if (planstate->plan->parallel_aware)
2223 rhaas 498 CBC 9 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
2223 rhaas 499 ECB : d->pcxt);
2049 rhaas 500 CBC 10 : break;
1936 andres 501 GIC 93 : case T_HashJoinState:
1936 andres 502 CBC 93 : if (planstate->plan->parallel_aware)
503 57 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
1936 andres 504 ECB : d->pcxt);
1936 andres 505 CBC 93 : break;
1951 andres 506 GIC 93 : case T_HashState:
1951 andres 507 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1951 andres 508 CBC 93 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
1951 andres 509 GIC 93 : break;
2049 rhaas 510 CBC 70 : case T_SortState:
1951 andres 511 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
2049 rhaas 512 CBC 70 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
2048 tgl 513 GIC 70 : break;
1098 tomas.vondra 514 LBC 0 : case T_IncrementalSortState:
1098 tomas.vondra 515 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1098 tomas.vondra 516 UBC 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
1098 tomas.vondra 517 UIC 0 : break;
1024 drowley 518 GBC 280 : case T_AggState:
1024 drowley 519 EUB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1024 drowley 520 CBC 280 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
1024 drowley 521 GIC 280 : break;
634 drowley 522 CBC 3 : case T_MemoizeState:
737 drowley 523 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
634 drowley 524 CBC 3 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
737 drowley 525 GIC 3 : break;
2049 rhaas 526 CBC 48 : default:
527 48 : break;
2706 rhaas 528 ECB : }
2750 529 :
2750 rhaas 530 GIC 1417 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
531 : }
2750 rhaas 532 ECB :
533 : /*
534 : * It sets up the response queues for backend workers to return tuples
535 : * to the main backend and start the workers.
536 : */
537 : static shm_mq_handle **
2718 rhaas 538 GIC 452 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
539 : {
2750 rhaas 540 ECB : shm_mq_handle **responseq;
541 : char *tqueuespace;
542 : int i;
543 :
544 : /* Skip this if no workers. */
2750 rhaas 545 GIC 452 : if (pcxt->nworkers == 0)
2750 rhaas 546 UIC 0 : return NULL;
2750 rhaas 547 ECB :
2750 rhaas 548 EUB : /* Allocate memory for shared memory queue handles. */
549 : responseq = (shm_mq_handle **)
2750 rhaas 550 GIC 452 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
551 :
2718 rhaas 552 ECB : /*
553 : * If not reinitializing, allocate space from the DSM for the queues;
554 : * otherwise, find the already allocated space.
555 : */
2718 rhaas 556 GIC 452 : if (!reinitialize)
557 : tqueuespace =
2718 rhaas 558 CBC 323 : shm_toc_allocate(pcxt->toc,
559 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
2529 560 323 : pcxt->nworkers));
561 : else
2134 tgl 562 129 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
563 :
2750 rhaas 564 ECB : /* Create the queues, and become the receiver for each. */
2750 rhaas 565 GIC 1700 : for (i = 0; i < pcxt->nworkers; ++i)
566 : {
2750 rhaas 567 ECB : shm_mq *mq;
568 :
2529 rhaas 569 GIC 1248 : mq = shm_mq_create(tqueuespace +
570 1248 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
2750 rhaas 571 ECB : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
572 :
2750 rhaas 573 GIC 1248 : shm_mq_set_receiver(mq, MyProc);
574 1248 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
2750 rhaas 575 ECB : }
576 :
577 : /* Add array of queues to shm_toc, so others can find it. */
2718 rhaas 578 GIC 452 : if (!reinitialize)
579 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
2750 rhaas 580 ECB :
581 : /* Return array of handles. */
2750 rhaas 582 GIC 452 : return responseq;
583 : }
2750 rhaas 584 ECB :
585 : /*
586 : * Sets up the required infrastructure for backend workers to perform
587 : * execution and return results to the main backend.
588 : */
589 : ParallelExecutorInfo *
1970 rhaas 590 GIC 323 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
591 : Bitmapset *sendParams, int nworkers,
2049 rhaas 592 ECB : int64 tuples_needed)
593 : {
594 : ParallelExecutorInfo *pei;
595 : ParallelContext *pcxt;
596 : ExecParallelEstimateContext e;
597 : ExecParallelInitializeDSMContext d;
598 : FixedParallelExecutorState *fpes;
599 : char *pstmt_data;
600 : char *pstmt_space;
601 : char *paramlistinfo_space;
602 : BufferUsage *bufusage_space;
603 : WalUsage *walusage_space;
2750 rhaas 604 GIC 323 : SharedExecutorInstrumentation *instrumentation = NULL;
1657 andres 605 323 : SharedJitInstrumentation *jit_instrumentation = NULL;
2750 rhaas 606 ECB : int pstmt_len;
1970 607 : int paramlistinfo_len;
2750 rhaas 608 GIC 323 : int instrumentation_len = 0;
1657 andres 609 323 : int jit_instrumentation_len = 0;
2678 rhaas 610 CBC 323 : int instrument_offset = 0;
2302 611 323 : Size dsa_minsize = dsa_minimum_size();
2237 rhaas 612 ECB : char *query_string;
613 : int query_len;
614 :
615 : /*
616 : * Force any initplan outputs that we're going to pass to workers to be
617 : * evaluated, if they weren't already.
618 : *
619 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
620 : * That risks intra-query memory leakage, since we might pass through here
621 : * many times before that ExprContext gets reset; but ExecSetParamPlan
622 : * doesn't normally leak any memory in the context (see its comments), so
623 : * it doesn't seem worth complicating this function's API to pass it a
624 : * shorter-lived ExprContext. This might need to change someday.
625 : */
1667 tgl 626 GIC 323 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
627 :
2750 rhaas 628 ECB : /* Allocate object for return value. */
2750 rhaas 629 GIC 323 : pei = palloc0(sizeof(ParallelExecutorInfo));
2699 630 323 : pei->finished = false;
2750 rhaas 631 CBC 323 : pei->planstate = planstate;
2750 rhaas 632 ECB :
633 : /* Fix up and serialize plan to be sent to workers. */
2748 rhaas 634 GIC 323 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
635 :
2750 rhaas 636 ECB : /* Create a parallel context. */
1486 tmunro 637 GIC 323 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
2750 rhaas 638 323 : pei->pcxt = pcxt;
2750 rhaas 639 ECB :
640 : /*
641 : * Before telling the parallel context to create a dynamic shared memory
642 : * segment, we need to figure out how big it should be. Estimate space
643 : * for the various things we need to store.
644 : */
645 :
646 : /* Estimate space for fixed-size state. */
2049 rhaas 647 GIC 323 : shm_toc_estimate_chunk(&pcxt->estimator,
648 : sizeof(FixedParallelExecutorState));
2049 rhaas 649 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
650 :
2237 rhaas 651 ECB : /* Estimate space for query text. */
724 tgl 652 GIC 323 : query_len = strlen(estate->es_sourceText);
1936 rhaas 653 323 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
2237 rhaas 654 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2237 rhaas 655 ECB :
2750 656 : /* Estimate space for serialized PlannedStmt. */
2750 rhaas 657 GIC 323 : pstmt_len = strlen(pstmt_data) + 1;
658 323 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
2750 rhaas 659 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2750 rhaas 660 ECB :
661 : /* Estimate space for serialized ParamListInfo. */
1970 rhaas 662 GIC 323 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
663 323 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
2750 rhaas 664 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2750 rhaas 665 ECB :
666 : /*
667 : * Estimate space for BufferUsage.
668 : *
669 : * If EXPLAIN is not in use and there are no extensions loaded that care,
670 : * we could skip this. But we have no way of knowing whether anyone's
671 : * looking at pgBufferUsage, so do it unconditionally.
672 : */
2750 rhaas 673 GIC 323 : shm_toc_estimate_chunk(&pcxt->estimator,
674 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2750 rhaas 675 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
676 :
1100 akapila 677 ECB : /*
678 : * Same thing for WalUsage.
679 : */
1100 akapila 680 GIC 323 : shm_toc_estimate_chunk(&pcxt->estimator,
681 : mul_size(sizeof(WalUsage), pcxt->nworkers));
1100 akapila 682 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
683 :
2750 rhaas 684 ECB : /* Estimate space for tuple queues. */
2750 rhaas 685 GIC 323 : shm_toc_estimate_chunk(&pcxt->estimator,
686 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
2750 rhaas 687 CBC 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
688 :
2750 rhaas 689 ECB : /*
690 : * Give parallel-aware nodes a chance to add to the estimates, and get a
691 : * count of how many PlanState nodes there are.
692 : */
2750 rhaas 693 GIC 323 : e.pcxt = pcxt;
694 323 : e.nnodes = 0;
2750 rhaas 695 CBC 323 : ExecParallelEstimate(planstate, &e);
2750 rhaas 696 ECB :
697 : /* Estimate space for instrumentation, if required. */
2750 rhaas 698 GIC 323 : if (estate->es_instrument)
699 : {
2750 rhaas 700 CBC 93 : instrumentation_len =
701 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
2532 702 93 : sizeof(int) * e.nnodes;
2678 rhaas 703 GIC 93 : instrumentation_len = MAXALIGN(instrumentation_len);
2678 rhaas 704 CBC 93 : instrument_offset = instrumentation_len;
2529 705 93 : instrumentation_len +=
706 93 : mul_size(sizeof(Instrumentation),
707 93 : mul_size(e.nnodes, nworkers));
2750 708 93 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
709 93 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1657 andres 710 ECB :
711 : /* Estimate space for JIT instrumentation, if required. */
1657 andres 712 GIC 93 : if (estate->es_jit_flags != PGJIT_NONE)
713 : {
1657 andres 714 CBC 12 : jit_instrumentation_len =
1657 andres 715 GIC 12 : offsetof(SharedJitInstrumentation, jit_instr) +
1657 andres 716 ECB : sizeof(JitInstrumentation) * nworkers;
1657 andres 717 CBC 12 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
1657 andres 718 GIC 12 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1657 andres 719 ECB : }
2750 rhaas 720 : }
721 :
722 : /* Estimate space for DSA area. */
2302 rhaas 723 GIC 323 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
724 323 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2302 rhaas 725 ECB :
2750 726 : /* Everyone's had a chance to ask for space, so now create the DSM. */
2750 rhaas 727 GIC 323 : InitializeParallelDSM(pcxt);
728 :
2750 rhaas 729 ECB : /*
730 : * OK, now we have a dynamic shared memory segment, and it should be big
731 : * enough to store all of the data we estimated we would want to put into
732 : * it, plus whatever general stuff (not specifically executor-related) the
733 : * ParallelContext itself needs to store there. None of the space we
734 : * asked for has been allocated or initialized yet, though, so do that.
735 : */
736 :
737 : /* Store fixed-size state. */
2049 rhaas 738 GIC 323 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
739 323 : fpes->tuples_needed = tuples_needed;
1970 rhaas 740 CBC 323 : fpes->param_exec = InvalidDsaPointer;
1966 741 323 : fpes->eflags = estate->es_top_eflags;
1844 andres 742 323 : fpes->jit_flags = estate->es_jit_flags;
2049 rhaas 743 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
2049 rhaas 744 ECB :
2237 745 : /* Store query string */
1936 rhaas 746 GIC 323 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
724 tgl 747 323 : memcpy(query_string, estate->es_sourceText, query_len + 1);
2237 rhaas 748 CBC 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
2237 rhaas 749 ECB :
2750 750 : /* Store serialized PlannedStmt. */
2750 rhaas 751 GIC 323 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
752 323 : memcpy(pstmt_space, pstmt_data, pstmt_len);
2750 rhaas 753 CBC 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
2750 rhaas 754 ECB :
755 : /* Store serialized ParamListInfo. */
1970 rhaas 756 GIC 323 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
757 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
1970 rhaas 758 CBC 323 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
2750 rhaas 759 ECB :
760 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
2750 rhaas 761 GIC 323 : bufusage_space = shm_toc_allocate(pcxt->toc,
2118 tgl 762 323 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2750 rhaas 763 CBC 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
764 323 : pei->buffer_usage = bufusage_space;
2750 rhaas 765 ECB :
1100 akapila 766 : /* Same for WalUsage. */
1100 akapila 767 GIC 323 : walusage_space = shm_toc_allocate(pcxt->toc,
768 323 : mul_size(sizeof(WalUsage), pcxt->nworkers));
1100 akapila 769 CBC 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
770 323 : pei->wal_usage = walusage_space;
1100 akapila 771 ECB :
2046 tgl 772 : /* Set up the tuple queues that the workers will write into. */
2718 rhaas 773 GIC 323 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
774 :
2046 tgl 775 ECB : /* We don't need the TupleQueueReaders yet, though. */
2046 tgl 776 GIC 323 : pei->reader = NULL;
777 :
2750 rhaas 778 ECB : /*
779 : * If instrumentation options were supplied, allocate space for the data.
780 : * It only gets partially initialized here; the rest happens during
781 : * ExecParallelInitializeDSM.
782 : */
2750 rhaas 783 GIC 323 : if (estate->es_instrument)
784 : {
2678 rhaas 785 ECB : Instrumentation *instrument;
786 : int i;
787 :
2750 rhaas 788 GIC 93 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
789 93 : instrumentation->instrument_options = estate->es_instrument;
2678 rhaas 790 CBC 93 : instrumentation->instrument_offset = instrument_offset;
791 93 : instrumentation->num_workers = nworkers;
792 93 : instrumentation->num_plan_nodes = e.nnodes;
793 93 : instrument = GetInstrumentationArray(instrumentation);
794 918 : for (i = 0; i < nworkers * e.nnodes; ++i)
795 825 : InstrInit(&instrument[i], estate->es_instrument);
2750 796 93 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
2750 rhaas 797 ECB : instrumentation);
2750 rhaas 798 CBC 93 : pei->instrumentation = instrumentation;
799 :
1657 andres 800 93 : if (estate->es_jit_flags != PGJIT_NONE)
801 : {
802 12 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
803 : jit_instrumentation_len);
804 12 : jit_instrumentation->num_workers = nworkers;
1657 andres 805 GIC 12 : memset(jit_instrumentation->jit_instr, 0,
1657 andres 806 ECB : sizeof(JitInstrumentation) * nworkers);
1657 andres 807 CBC 12 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
808 : jit_instrumentation);
809 12 : pei->jit_instrumentation = jit_instrumentation;
810 : }
2750 rhaas 811 ECB : }
812 :
813 : /*
814 : * Create a DSA area that can be used by the leader and all workers.
815 : * (However, if we failed to create a DSM and are using private memory
816 : * instead, then skip this.)
817 : */
2302 rhaas 818 GIC 323 : if (pcxt->seg != NULL)
819 : {
2302 rhaas 820 ECB : char *area_space;
821 :
2302 rhaas 822 GIC 323 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
823 323 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
2302 rhaas 824 CBC 323 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
2302 rhaas 825 ECB : LWTRANCHE_PARALLEL_QUERY_DSA,
826 : pcxt->seg);
827 :
828 : /*
829 : * Serialize parameters, if any, using DSA storage. We don't dare use
830 : * the main parallel query DSM for this because we might relaunch
831 : * workers after the values have changed (and thus the amount of
832 : * storage required has changed).
833 : */
1970 rhaas 834 GIC 323 : if (!bms_is_empty(sendParams))
835 : {
1938 rhaas 836 CBC 15 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
837 : pei->area);
1970 838 15 : fpes->param_exec = pei->param_exec;
839 : }
1970 rhaas 840 ECB : }
841 :
842 : /*
843 : * Give parallel-aware nodes a chance to initialize their shared data.
844 : * This also initializes the elements of instrumentation->ps_instrument,
845 : * if it exists.
846 : */
2750 rhaas 847 GIC 323 : d.pcxt = pcxt;
848 323 : d.instrumentation = instrumentation;
2750 rhaas 849 CBC 323 : d.nnodes = 0;
1938 rhaas 850 ECB :
851 : /* Install our DSA area while initializing the plan. */
1938 rhaas 852 GIC 323 : estate->es_query_dsa = pei->area;
2750 853 323 : ExecParallelInitializeDSM(planstate, &d);
1938 rhaas 854 CBC 323 : estate->es_query_dsa = NULL;
2750 rhaas 855 ECB :
856 : /*
857 : * Make sure that the world hasn't shifted under our feet. This could
858 : * probably just be an Assert(), but let's be conservative for now.
859 : */
2750 rhaas 860 GIC 323 : if (e.nnodes != d.nnodes)
2750 rhaas 861 UIC 0 : elog(ERROR, "inconsistent count of PlanState nodes");
2750 rhaas 862 ECB :
2750 rhaas 863 EUB : /* OK, we're ready to rock and roll. */
2750 rhaas 864 GIC 323 : return pei;
865 : }
2750 rhaas 866 ECB :
867 : /*
868 : * Set up tuple queue readers to read the results of a parallel subplan.
869 : *
870 : * This is separate from ExecInitParallelPlan() because we can launch the
871 : * worker processes and let them start doing something before we do this.
872 : */
873 : void
2033 andres 874 GIC 443 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
875 : {
2046 tgl 876 CBC 443 : int nworkers = pei->pcxt->nworkers_launched;
877 : int i;
2046 tgl 878 ECB :
2046 tgl 879 GIC 443 : Assert(pei->reader == NULL);
880 :
2046 tgl 881 CBC 443 : if (nworkers > 0)
882 : {
883 443 : pei->reader = (TupleQueueReader **)
2046 tgl 884 GIC 443 : palloc(nworkers * sizeof(TupleQueueReader *));
2046 tgl 885 ECB :
2046 tgl 886 CBC 1653 : for (i = 0; i < nworkers; i++)
887 : {
888 1210 : shm_mq_set_handle(pei->tqueue[i],
2046 tgl 889 GIC 1210 : pei->pcxt->worker[i].bgwhandle);
2033 andres 890 CBC 1210 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
2046 tgl 891 ECB : }
892 : }
2046 tgl 893 GIC 443 : }
894 :
2048 tgl 895 ECB : /*
896 : * Re-initialize the parallel executor shared memory state before launching
897 : * a fresh batch of workers.
898 : */
899 : void
2048 tgl 900 GIC 129 : ExecParallelReinitialize(PlanState *planstate,
901 : ParallelExecutorInfo *pei,
1970 rhaas 902 ECB : Bitmapset *sendParams)
903 : {
1970 rhaas 904 GIC 129 : EState *estate = planstate->state;
905 : FixedParallelExecutorState *fpes;
1970 rhaas 906 ECB :
907 : /* Old workers must already be shut down */
2048 tgl 908 GIC 129 : Assert(pei->finished);
909 :
1667 tgl 910 ECB : /*
911 : * Force any initplan outputs that we're going to pass to workers to be
912 : * evaluated, if they weren't already (see comments in
913 : * ExecInitParallelPlan).
914 : */
1667 tgl 915 GIC 129 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
916 :
2048 tgl 917 CBC 129 : ReinitializeParallelDSM(pei->pcxt);
2048 tgl 918 GIC 129 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
2046 tgl 919 CBC 129 : pei->reader = NULL;
2048 920 129 : pei->finished = false;
2048 tgl 921 ECB :
1970 rhaas 922 CBC 129 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
923 :
1970 rhaas 924 ECB : /* Free any serialized parameters from the last round. */
1970 rhaas 925 GIC 129 : if (DsaPointerIsValid(fpes->param_exec))
926 : {
1938 rhaas 927 LBC 0 : dsa_free(pei->area, fpes->param_exec);
1970 rhaas 928 UIC 0 : fpes->param_exec = InvalidDsaPointer;
1970 rhaas 929 EUB : }
930 :
931 : /* Serialize current parameter values if required. */
1970 rhaas 932 GIC 129 : if (!bms_is_empty(sendParams))
933 : {
1938 rhaas 934 LBC 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
935 : pei->area);
1970 rhaas 936 UBC 0 : fpes->param_exec = pei->param_exec;
937 : }
1970 rhaas 938 EUB :
939 : /* Traverse plan tree and let each child node reset associated state. */
1938 rhaas 940 GIC 129 : estate->es_query_dsa = pei->area;
2048 tgl 941 129 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
1938 rhaas 942 CBC 129 : estate->es_query_dsa = NULL;
2048 tgl 943 129 : }
2048 tgl 944 ECB :
945 : /*
946 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
947 : */
948 : static bool
2048 tgl 949 GIC 333 : ExecParallelReInitializeDSM(PlanState *planstate,
950 : ParallelContext *pcxt)
2048 tgl 951 ECB : {
2048 tgl 952 GIC 333 : if (planstate == NULL)
2048 tgl 953 UIC 0 : return false;
2048 tgl 954 ECB :
2048 tgl 955 EUB : /*
956 : * Call reinitializers for DSM-using plan nodes.
957 : */
2048 tgl 958 GIC 333 : switch (nodeTag(planstate))
959 : {
2048 tgl 960 CBC 138 : case T_SeqScanState:
2048 tgl 961 GIC 138 : if (planstate->plan->parallel_aware)
2048 tgl 962 CBC 114 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
2048 tgl 963 ECB : pcxt);
2048 tgl 964 CBC 138 : break;
2048 tgl 965 GIC 6 : case T_IndexScanState:
2048 tgl 966 CBC 6 : if (planstate->plan->parallel_aware)
967 6 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
2048 tgl 968 ECB : pcxt);
2048 tgl 969 CBC 6 : break;
2048 tgl 970 GIC 6 : case T_IndexOnlyScanState:
2048 tgl 971 CBC 6 : if (planstate->plan->parallel_aware)
972 6 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
2048 tgl 973 ECB : pcxt);
2048 tgl 974 CBC 6 : break;
2048 tgl 975 UIC 0 : case T_ForeignScanState:
2048 tgl 976 LBC 0 : if (planstate->plan->parallel_aware)
2048 tgl 977 UBC 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
2048 tgl 978 EUB : pcxt);
2048 tgl 979 UBC 0 : break;
1951 rhaas 980 UIC 0 : case T_AppendState:
1951 rhaas 981 UBC 0 : if (planstate->plan->parallel_aware)
982 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
983 0 : break;
2048 tgl 984 0 : case T_CustomScanState:
985 0 : if (planstate->plan->parallel_aware)
986 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
2048 tgl 987 EUB : pcxt);
2048 tgl 988 UBC 0 : break;
2048 tgl 989 GIC 27 : case T_BitmapHeapScanState:
2048 tgl 990 GBC 27 : if (planstate->plan->parallel_aware)
2048 tgl 991 CBC 27 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
2048 tgl 992 ECB : pcxt);
2048 tgl 993 CBC 27 : break;
1936 andres 994 GIC 48 : case T_HashJoinState:
1936 andres 995 CBC 48 : if (planstate->plan->parallel_aware)
996 24 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1936 andres 997 ECB : pcxt);
1936 andres 998 CBC 48 : break;
1951 andres 999 GIC 63 : case T_HashState:
2048 tgl 1000 ECB : case T_SortState:
1098 tomas.vondra 1001 : case T_IncrementalSortState:
1002 : case T_MemoizeState:
1003 : /* these nodes have DSM state, but no reinitialization is required */
2048 tgl 1004 GIC 63 : break;
1005 :
2048 tgl 1006 CBC 45 : default:
2048 tgl 1007 GIC 45 : break;
2048 tgl 1008 ECB : }
1009 :
2048 tgl 1010 GIC 333 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1011 : }
2048 tgl 1012 ECB :
1013 : /*
1014 : * Copy instrumentation information about this node and its descendants from
1015 : * dynamic shared memory.
1016 : */
1017 : static bool
2750 rhaas 1018 GIC 507 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1019 : SharedExecutorInstrumentation *instrumentation)
2750 rhaas 1020 ECB : {
1021 : Instrumentation *instrument;
1022 : int i;
1023 : int n;
1024 : int ibytes;
2495 rhaas 1025 GIC 507 : int plan_node_id = planstate->plan->plan_node_id;
1026 : MemoryContext oldcontext;
2750 rhaas 1027 ECB :
1028 : /* Find the instrumentation for this node. */
2678 rhaas 1029 GIC 2235 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1030 2235 : if (instrumentation->plan_node_id[i] == plan_node_id)
2750 rhaas 1031 CBC 507 : break;
2678 1032 507 : if (i >= instrumentation->num_plan_nodes)
2750 rhaas 1033 LBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
2750 rhaas 1034 ECB :
2678 rhaas 1035 EUB : /* Accumulate the statistics from all workers. */
2678 rhaas 1036 GIC 507 : instrument = GetInstrumentationArray(instrumentation);
1037 507 : instrument += i * instrumentation->num_workers;
2678 rhaas 1038 CBC 1332 : for (n = 0; n < instrumentation->num_workers; ++n)
1039 825 : InstrAggNode(planstate->instrument, &instrument[n]);
2678 rhaas 1040 ECB :
1943 1041 : /*
1042 : * Also store the per-worker detail.
1043 : *
1044 : * Worker instrumentation should be allocated in the same context as the
1045 : * regular instrumentation information, which is the per-query context.
1046 : * Switch into per-query memory context.
1047 : */
1943 rhaas 1048 GIC 507 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1049 507 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1943 rhaas 1050 CBC 507 : planstate->worker_instrument =
1051 507 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1052 507 : MemoryContextSwitchTo(oldcontext);
2427 rhaas 1053 ECB :
2678 rhaas 1054 CBC 507 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1943 rhaas 1055 GIC 507 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
2750 rhaas 1056 ECB :
1951 andres 1057 : /* Perform any node-type-specific work that needs to be done. */
1951 andres 1058 GIC 507 : switch (nodeTag(planstate))
1059 : {
1951 andres 1060 CBC 6 : case T_SortState:
1951 andres 1061 GIC 6 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1951 andres 1062 CBC 6 : break;
1098 tomas.vondra 1063 LBC 0 : case T_IncrementalSortState:
1064 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1098 tomas.vondra 1065 UBC 0 : break;
1951 andres 1066 GBC 42 : case T_HashState:
1067 42 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1951 andres 1068 CBC 42 : break;
1024 drowley 1069 54 : case T_AggState:
1070 54 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1071 54 : break;
634 drowley 1072 LBC 0 : case T_MemoizeState:
1073 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
737 drowley 1074 UBC 0 : break;
1951 andres 1075 GBC 405 : default:
1076 405 : break;
1951 andres 1077 ECB : }
2049 rhaas 1078 :
2750 rhaas 1079 GIC 507 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1080 : instrumentation);
2750 rhaas 1081 ECB : }
1082 :
1083 : /*
1084 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1085 : */
1086 : static void
1657 andres 1087 GIC 12 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1088 : SharedJitInstrumentation *shared_jit)
1657 andres 1089 ECB : {
1090 : JitInstrumentation *combined;
1091 : int ibytes;
1092 :
1093 : int n;
1094 :
1095 : /*
1096 : * Accumulate worker JIT instrumentation into the combined JIT
1097 : * instrumentation, allocating it if required.
1098 : */
1649 andres 1099 GIC 12 : if (!planstate->state->es_jit_worker_instr)
1100 12 : planstate->state->es_jit_worker_instr =
1657 andres 1101 CBC 12 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1649 1102 12 : combined = planstate->state->es_jit_worker_instr;
1657 andres 1103 ECB :
1537 heikki.linnakangas 1104 : /* Accumulate all the workers' instrumentations. */
1657 andres 1105 GIC 36 : for (n = 0; n < shared_jit->num_workers; ++n)
1106 24 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1657 andres 1107 ECB :
1108 : /*
1109 : * Store the per-worker detail.
1110 : *
1111 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1112 : * instrumentation in per-query context.
1113 : */
1657 andres 1114 GIC 12 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1418 tgl 1115 12 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1657 andres 1116 CBC 12 : planstate->worker_jit_instrument =
1117 12 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1657 andres 1118 ECB :
1657 andres 1119 CBC 12 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1657 andres 1120 GIC 12 : }
1657 andres 1121 ECB :
2750 rhaas 1122 : /*
1123 : * Finish parallel execution. We wait for parallel workers to finish, and
1124 : * accumulate their buffer/WAL usage.
1125 : */
1126 : void
2750 rhaas 1127 GIC 823 : ExecParallelFinish(ParallelExecutorInfo *pei)
1128 : {
2046 tgl 1129 CBC 823 : int nworkers = pei->pcxt->nworkers_launched;
1130 : int i;
2750 rhaas 1131 ECB :
1132 : /* Make this be a no-op if called twice in a row. */
2699 rhaas 1133 GIC 823 : if (pei->finished)
1134 374 : return;
2699 rhaas 1135 ECB :
2046 tgl 1136 : /*
1137 : * Detach from tuple queues ASAP, so that any still-active workers will
1138 : * notice that no further results are wanted.
1139 : */
2046 tgl 1140 GIC 449 : if (pei->tqueue != NULL)
1141 : {
2046 tgl 1142 CBC 1656 : for (i = 0; i < nworkers; i++)
2046 tgl 1143 GIC 1207 : shm_mq_detach(pei->tqueue[i]);
2046 tgl 1144 CBC 449 : pfree(pei->tqueue);
1145 449 : pei->tqueue = NULL;
2046 tgl 1146 ECB : }
1147 :
1148 : /*
1149 : * While we're waiting for the workers to finish, let's get rid of the
1150 : * tuple queue readers. (Any other local cleanup could be done here too.)
1151 : */
2046 tgl 1152 GIC 449 : if (pei->reader != NULL)
1153 : {
2046 tgl 1154 CBC 1647 : for (i = 0; i < nworkers; i++)
2046 tgl 1155 GIC 1207 : DestroyTupleQueueReader(pei->reader[i]);
2046 tgl 1156 CBC 440 : pfree(pei->reader);
1157 440 : pei->reader = NULL;
2046 tgl 1158 ECB : }
1159 :
1160 : /* Now wait for the workers to finish. */
2750 rhaas 1161 GIC 449 : WaitForParallelWorkersToFinish(pei->pcxt);
1162 :
2046 tgl 1163 ECB : /*
1164 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1165 : * finish, or we might get incomplete data.)
1166 : */
2046 tgl 1167 GIC 1656 : for (i = 0; i < nworkers; i++)
1100 akapila 1168 1207 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
2750 rhaas 1169 ECB :
2699 rhaas 1170 CBC 449 : pei->finished = true;
1171 : }
2750 rhaas 1172 ECB :
1173 : /*
1174 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1175 : * resources still exist after ExecParallelFinish. We separate these
1176 : * routines because someone might want to examine the contents of the DSM
1177 : * after ExecParallelFinish and before calling this routine.
1178 : */
1179 : void
2732 rhaas 1180 GIC 320 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1181 : {
1937 rhaas 1182 ECB : /* Accumulate instrumentation, if any. */
1937 rhaas 1183 GIC 320 : if (pei->instrumentation)
1184 93 : ExecParallelRetrieveInstrumentation(pei->planstate,
1937 rhaas 1185 ECB : pei->instrumentation);
1186 :
1187 : /* Accumulate JIT instrumentation, if any. */
1657 andres 1188 GIC 320 : if (pei->jit_instrumentation)
1189 12 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1418 tgl 1190 CBC 12 : pei->jit_instrumentation);
1657 andres 1191 ECB :
1970 rhaas 1192 : /* Free any serialized parameters. */
1970 rhaas 1193 GIC 320 : if (DsaPointerIsValid(pei->param_exec))
1194 : {
1970 rhaas 1195 CBC 15 : dsa_free(pei->area, pei->param_exec);
1970 rhaas 1196 GIC 15 : pei->param_exec = InvalidDsaPointer;
1970 rhaas 1197 ECB : }
2302 rhaas 1198 CBC 320 : if (pei->area != NULL)
1199 : {
1200 320 : dsa_detach(pei->area);
2302 rhaas 1201 GIC 320 : pei->area = NULL;
2302 rhaas 1202 ECB : }
2732 rhaas 1203 CBC 320 : if (pei->pcxt != NULL)
1204 : {
1205 320 : DestroyParallelContext(pei->pcxt);
2732 rhaas 1206 GIC 320 : pei->pcxt = NULL;
2732 rhaas 1207 ECB : }
2732 rhaas 1208 CBC 320 : pfree(pei);
2732 rhaas 1209 GIC 320 : }
2732 rhaas 1210 ECB :
2750 1211 : /*
1212 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1213 : * for that purpose.
1214 : */
1215 : static DestReceiver *
2750 rhaas 1216 GIC 1210 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1217 : {
2750 rhaas 1218 ECB : char *mqspace;
1219 : shm_mq *mq;
1220 :
2134 tgl 1221 GIC 1210 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
2750 rhaas 1222 1210 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
2750 rhaas 1223 CBC 1210 : mq = (shm_mq *) mqspace;
1224 1210 : shm_mq_set_sender(mq, MyProc);
1225 1210 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
2750 rhaas 1226 ECB : }
1227 :
1228 : /*
1229 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1230 : */
1231 : static QueryDesc *
2750 rhaas 1232 GIC 1210 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1233 : int instrument_options)
2750 rhaas 1234 ECB : {
1235 : char *pstmtspace;
1236 : char *paramspace;
1237 : PlannedStmt *pstmt;
1238 : ParamListInfo paramLI;
1239 : char *queryString;
1240 :
1241 : /* Get the query string from shared memory */
2134 tgl 1242 GIC 1210 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1243 :
2750 rhaas 1244 ECB : /* Reconstruct leader-supplied PlannedStmt. */
2134 tgl 1245 GIC 1210 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
2750 rhaas 1246 1210 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
2750 rhaas 1247 ECB :
1248 : /* Reconstruct ParamListInfo. */
1970 rhaas 1249 GIC 1210 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
2750 1250 1210 : paramLI = RestoreParamList(¶mspace);
2750 rhaas 1251 ECB :
1657 alvherre 1252 : /* Create a QueryDesc for the query. */
2750 rhaas 1253 GIC 1210 : return CreateQueryDesc(pstmt,
1254 : queryString,
2750 rhaas 1255 ECB : GetActiveSnapshot(), InvalidSnapshot,
1256 : receiver, paramLI, NULL, instrument_options);
1257 : }
1258 :
1259 : /*
1260 : * Copy instrumentation information from this node and its descendants into
1261 : * dynamic shared memory, so that the parallel leader can retrieve it.
1262 : */
1263 : static bool
2750 rhaas 1264 GIC 1169 : ExecParallelReportInstrumentation(PlanState *planstate,
1265 : SharedExecutorInstrumentation *instrumentation)
2750 rhaas 1266 ECB : {
1267 : int i;
2495 rhaas 1268 GIC 1169 : int plan_node_id = planstate->plan->plan_node_id;
1269 : Instrumentation *instrument;
2678 rhaas 1270 ECB :
2678 rhaas 1271 GIC 1169 : InstrEndLoop(planstate->instrument);
1272 :
2750 rhaas 1273 ECB : /*
1274 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1275 : * order, we could use binary search here. This might matter someday if
1276 : * we're pushing down sufficiently large plan trees. For now, do it the
1277 : * slow, dumb way.
1278 : */
2678 rhaas 1279 GIC 3713 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1280 3713 : if (instrumentation->plan_node_id[i] == plan_node_id)
2750 rhaas 1281 CBC 1169 : break;
2678 1282 1169 : if (i >= instrumentation->num_plan_nodes)
2750 rhaas 1283 LBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
2750 rhaas 1284 ECB :
2750 rhaas 1285 EUB : /*
1286 : * Add our statistics to the per-node, per-worker totals. It's possible
1287 : * that this could happen more than once if we relaunched workers.
1288 : */
2678 rhaas 1289 GIC 1169 : instrument = GetInstrumentationArray(instrumentation);
1290 1169 : instrument += i * instrumentation->num_workers;
2678 rhaas 1291 CBC 1169 : Assert(IsParallelWorker());
1292 1169 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1293 1169 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
2750 rhaas 1294 ECB :
2750 rhaas 1295 CBC 1169 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1296 : instrumentation);
2750 rhaas 1297 ECB : }
1298 :
1299 : /*
1300 : * Initialize the PlanState and its descendants with the information
1301 : * retrieved from shared memory. This has to be done once the PlanState
1302 : * is allocated and initialized by executor; that is, after ExecutorStart().
1303 : */
1304 : static bool
1970 andres 1305 GIC 3962 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1306 : {
2706 rhaas 1307 CBC 3962 : if (planstate == NULL)
2706 rhaas 1308 UIC 0 : return false;
2706 rhaas 1309 ECB :
2049 rhaas 1310 GBC 3962 : switch (nodeTag(planstate))
1311 : {
2049 rhaas 1312 CBC 1626 : case T_SeqScanState:
2049 rhaas 1313 GIC 1626 : if (planstate->plan->parallel_aware)
1970 andres 1314 CBC 1312 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
2049 rhaas 1315 1626 : break;
1316 186 : case T_IndexScanState:
1317 186 : if (planstate->plan->parallel_aware)
1970 andres 1318 48 : ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1970 andres 1319 ECB : pwcxt);
2049 rhaas 1320 CBC 186 : break;
2049 rhaas 1321 GIC 118 : case T_IndexOnlyScanState:
2049 rhaas 1322 CBC 118 : if (planstate->plan->parallel_aware)
1970 andres 1323 100 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1970 andres 1324 ECB : pwcxt);
2049 rhaas 1325 CBC 118 : break;
2049 rhaas 1326 UIC 0 : case T_ForeignScanState:
2049 rhaas 1327 LBC 0 : if (planstate->plan->parallel_aware)
2622 rhaas 1328 UBC 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1970 andres 1329 EUB : pwcxt);
2049 rhaas 1330 UBC 0 : break;
1951 rhaas 1331 GIC 182 : case T_AppendState:
1951 rhaas 1332 GBC 182 : if (planstate->plan->parallel_aware)
1951 rhaas 1333 CBC 152 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1334 182 : break;
2049 rhaas 1335 LBC 0 : case T_CustomScanState:
1336 0 : if (planstate->plan->parallel_aware)
2622 rhaas 1337 UBC 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1970 andres 1338 EUB : pwcxt);
2049 rhaas 1339 UBC 0 : break;
2049 rhaas 1340 GIC 139 : case T_BitmapHeapScanState:
2049 rhaas 1341 GBC 139 : if (planstate->plan->parallel_aware)
1970 andres 1342 CBC 138 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1970 andres 1343 ECB : pwcxt);
2049 rhaas 1344 CBC 139 : break;
1936 andres 1345 GIC 267 : case T_HashJoinState:
1936 andres 1346 CBC 267 : if (planstate->plan->parallel_aware)
1347 147 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1936 andres 1348 ECB : pwcxt);
1936 andres 1349 CBC 267 : break;
1951 andres 1350 GIC 267 : case T_HashState:
1951 andres 1351 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1951 andres 1352 CBC 267 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1951 andres 1353 GIC 267 : break;
2049 rhaas 1354 CBC 214 : case T_SortState:
1951 andres 1355 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1970 andres 1356 CBC 214 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
2048 tgl 1357 GIC 214 : break;
1098 tomas.vondra 1358 LBC 0 : case T_IncrementalSortState:
1098 tomas.vondra 1359 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1098 tomas.vondra 1360 UBC 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1361 : pwcxt);
1362 0 : break;
1024 drowley 1363 GIC 768 : case T_AggState:
1024 drowley 1364 EUB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1024 drowley 1365 CBC 768 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1024 drowley 1366 GIC 768 : break;
634 drowley 1367 CBC 6 : case T_MemoizeState:
737 drowley 1368 ECB : /* even when not parallel-aware, for EXPLAIN ANALYZE */
634 drowley 1369 CBC 6 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
737 drowley 1370 GIC 6 : break;
2049 rhaas 1371 CBC 189 : default:
1372 189 : break;
2706 rhaas 1373 ECB : }
1374 :
1970 andres 1375 GIC 3962 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1376 : pwcxt);
2706 rhaas 1377 ECB : }
1378 :
1379 : /*
1380 : * Main entrypoint for parallel query worker processes.
1381 : *
1382 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1383 : * create a sensible parallel environment has already been done;
1384 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1385 : * CID mappings, and GUC values, so we don't need to deal with any of that
1386 : * here.
1387 : *
1388 : * Our job is to deal with concerns specific to the executor. The parallel
1389 : * group leader will have stored a serialized PlannedStmt, and it's our job
1390 : * to execute that plan and write the resulting tuples to the appropriate
1391 : * tuple queue. Various bits of supporting information that we need in order
1392 : * to do this are also stored in the dsm_segment and can be accessed through
1393 : * the shm_toc.
1394 : */
1395 : void
2750 rhaas 1396 GIC 1210 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1397 : {
2049 rhaas 1398 ECB : FixedParallelExecutorState *fpes;
1399 : BufferUsage *buffer_usage;
1400 : WalUsage *wal_usage;
1401 : DestReceiver *receiver;
1402 : QueryDesc *queryDesc;
1403 : SharedExecutorInstrumentation *instrumentation;
1404 : SharedJitInstrumentation *jit_instrumentation;
2750 rhaas 1405 GIC 1210 : int instrument_options = 0;
1406 : void *area_space;
2302 rhaas 1407 ECB : dsa_area *area;
1408 : ParallelWorkerContext pwcxt;
1409 :
1410 : /* Get fixed-size state. */
2049 rhaas 1411 GIC 1210 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1412 :
2750 rhaas 1413 ECB : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
2750 rhaas 1414 GIC 1210 : receiver = ExecParallelGetReceiver(seg, toc);
2134 tgl 1415 1210 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
2750 rhaas 1416 CBC 1210 : if (instrumentation != NULL)
1417 368 : instrument_options = instrumentation->instrument_options;
1657 andres 1418 1210 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1657 andres 1419 ECB : true);
2750 rhaas 1420 CBC 1210 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1421 :
2237 rhaas 1422 ECB : /* Setting debug_query_string for individual workers */
2237 rhaas 1423 GIC 1210 : debug_query_string = queryDesc->sourceText;
1424 :
378 michael 1425 ECB : /* Report workers' query for monitoring purposes */
2237 rhaas 1426 GIC 1210 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1427 :
2302 rhaas 1428 ECB : /* Attach to the dynamic shared memory area. */
2134 tgl 1429 GIC 1210 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
2302 rhaas 1430 1210 : area = dsa_attach_in_place(area_space, seg);
2302 rhaas 1431 ECB :
1432 : /* Start up the executor */
1844 andres 1433 GIC 1210 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1966 rhaas 1434 1210 : ExecutorStart(queryDesc, fpes->eflags);
2302 rhaas 1435 ECB :
1436 : /* Special executor initialization steps for parallel workers */
2302 rhaas 1437 GIC 1210 : queryDesc->planstate->state->es_query_dsa = area;
1970 1438 1210 : if (DsaPointerIsValid(fpes->param_exec))
1970 rhaas 1439 ECB : {
1440 : char *paramexec_space;
1441 :
1970 rhaas 1442 GIC 35 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1443 35 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1970 rhaas 1444 ECB : }
1970 andres 1445 CBC 1210 : pwcxt.toc = toc;
1970 andres 1446 GIC 1210 : pwcxt.seg = seg;
1970 andres 1447 CBC 1210 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
2302 rhaas 1448 ECB :
2049 1449 : /* Pass down any tuple bound */
2049 rhaas 1450 GIC 1210 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1451 :
1710 akapila 1452 ECB : /*
1453 : * Prepare to track buffer/WAL usage during query execution.
1454 : *
1455 : * We do this after starting up the executor to match what happens in the
1456 : * leader, which also doesn't count buffer accesses and WAL activity that
1457 : * occur during executor startup.
1458 : */
1710 akapila 1459 GIC 1210 : InstrStartParallelQuery();
1460 :
2049 rhaas 1461 ECB : /*
1462 : * Run the plan. If we specified a tuple bound, be careful not to demand
1463 : * more tuples than that.
1464 : */
2049 rhaas 1465 GIC 1210 : ExecutorRun(queryDesc,
1466 : ForwardScanDirection,
2049 rhaas 1467 CBC 1210 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1468 : true);
2302 rhaas 1469 ECB :
1470 : /* Shut down the executor */
2750 rhaas 1471 GIC 1207 : ExecutorFinish(queryDesc);
1472 :
1100 akapila 1473 ECB : /* Report buffer/WAL usage during parallel execution. */
2134 tgl 1474 GIC 1207 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1100 akapila 1475 1207 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1100 akapila 1476 CBC 1207 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1477 1207 : &wal_usage[ParallelWorkerNumber]);
2750 rhaas 1478 ECB :
1479 : /* Report instrumentation data if any instrumentation options are set. */
2750 rhaas 1480 GIC 1207 : if (instrumentation != NULL)
1481 368 : ExecParallelReportInstrumentation(queryDesc->planstate,
2750 rhaas 1482 ECB : instrumentation);
1483 :
1484 : /* Report JIT instrumentation data if any */
1657 andres 1485 GIC 1207 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1486 : {
1657 andres 1487 CBC 72 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1657 andres 1488 GIC 72 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1657 andres 1489 CBC 72 : queryDesc->estate->es_jit->instr;
1657 andres 1490 ECB : }
1491 :
1492 : /* Must do this after capturing instrumentation. */
2748 rhaas 1493 GIC 1207 : ExecutorEnd(queryDesc);
1494 :
2750 rhaas 1495 ECB : /* Cleanup. */
2302 rhaas 1496 GIC 1207 : dsa_detach(area);
2750 1497 1207 : FreeQueryDesc(queryDesc);
2040 peter_e 1498 CBC 1207 : receiver->rDestroy(receiver);
2750 rhaas 1499 1207 : }
|