Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGather.c
4 : * Support routines for scanning a plan via multiple workers.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * A Gather executor launches parallel workers to run multiple copies of a
10 : * plan. It can also run the plan itself, if the workers are not available
11 : * or have not started up yet. It then merges all of the results it produces
12 : * and the results from the workers into a single output stream. Therefore,
13 : * it will normally be used with a plan where running multiple copies of the
14 : * same plan does not produce duplicate output, such as parallel-aware
15 : * SeqScan.
16 : *
17 : * Alternatively, a Gather node can be configured to use just one worker
18 : * and the single-copy flag can be set. In this case, the Gather node will
19 : * run the plan in one worker and will not execute the plan itself. In
20 : * this case, it simply returns whatever tuples were returned by the worker.
21 : * If a worker cannot be obtained, then it will run the plan itself and
22 : * return the results. Therefore, a plan used with a single-copy Gather
23 : * node need not be parallel-aware.
24 : *
25 : * IDENTIFICATION
26 : * src/backend/executor/nodeGather.c
27 : *
28 : *-------------------------------------------------------------------------
29 : */
30 :
31 : #include "postgres.h"
32 :
33 : #include "access/relscan.h"
34 : #include "access/xact.h"
35 : #include "executor/execdebug.h"
36 : #include "executor/execParallel.h"
37 : #include "executor/nodeGather.h"
38 : #include "executor/nodeSubplan.h"
39 : #include "executor/tqueue.h"
40 : #include "miscadmin.h"
41 : #include "optimizer/optimizer.h"
42 : #include "pgstat.h"
43 : #include "utils/memutils.h"
44 : #include "utils/rel.h"
45 :
46 :
47 : static TupleTableSlot *ExecGather(PlanState *pstate);
48 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 : static MinimalTuple gather_readnext(GatherState *gatherstate);
50 : static void ExecShutdownGatherWorkers(GatherState *node);
51 :
52 :
53 : /* ----------------------------------------------------------------
54 : * ExecInitGather
55 : * ----------------------------------------------------------------
56 : */
57 : GatherState *
2748 rhaas 58 CBC 479 : ExecInitGather(Gather *node, EState *estate, int eflags)
59 : {
60 : GatherState *gatherstate;
61 : Plan *outerNode;
62 : TupleDesc tupDesc;
63 :
64 : /* Gather node doesn't have innerPlan node. */
65 479 : Assert(innerPlan(node) == NULL);
66 :
67 : /*
68 : * create state structure
69 : */
70 479 : gatherstate = makeNode(GatherState);
71 479 : gatherstate->ps.plan = (Plan *) node;
72 479 : gatherstate->ps.state = estate;
2092 andres 73 479 : gatherstate->ps.ExecProcNode = ExecGather;
74 :
2048 tgl 75 479 : gatherstate->initialized = false;
1971 rhaas 76 479 : gatherstate->need_to_scan_locally =
77 479 : !node->single_copy && parallel_leader_participation;
2049 78 479 : gatherstate->tuples_needed = -1;
79 :
80 : /*
81 : * Miscellaneous initialization
82 : *
83 : * create expression context for node
84 : */
2748 85 479 : ExecAssignExprContext(estate, &gatherstate->ps);
86 :
87 : /*
88 : * now initialize outer plan
89 : */
2720 90 479 : outerNode = outerPlan(node);
91 479 : outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
1878 andres 92 479 : tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 :
94 : /*
95 : * Leader may access ExecProcNode result directly (if
96 : * need_to_scan_locally), or from workers via tuple queue. So we can't
97 : * trivially rely on the slot type being fixed for expressions evaluated
98 : * within this node.
99 : */
1606 100 479 : gatherstate->ps.outeropsset = true;
101 479 : gatherstate->ps.outeropsfixed = false;
102 :
103 : /*
104 : * Initialize result type and projection.
105 : */
1612 106 479 : ExecInitResultTypeTL(&gatherstate->ps);
1878 107 479 : ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
108 :
109 : /*
110 : * Without projections result slot type is not trivially known, see
111 : * comment above.
112 : */
1606 113 479 : if (gatherstate->ps.ps_ProjInfo == NULL)
114 : {
115 458 : gatherstate->ps.resultopsset = true;
116 458 : gatherstate->ps.resultopsfixed = false;
117 : }
118 :
119 : /*
120 : * Initialize funnel slot to same tuple descriptor as outer plan.
121 : */
122 479 : gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
123 : &TTSOpsMinimalTuple);
124 :
125 : /*
126 : * Gather doesn't support checking a qual (it's always more efficient to
127 : * do it in the child node).
128 : */
1878 129 479 : Assert(!node->plan.qual);
130 :
2748 rhaas 131 479 : return gatherstate;
132 : }
133 :
134 : /* ----------------------------------------------------------------
135 : * ExecGather(node)
136 : *
137 : * Scans the relation via multiple workers and returns
138 : * the next qualifying tuple.
139 : * ----------------------------------------------------------------
140 : */
141 : static TupleTableSlot *
2092 andres 142 1459186 : ExecGather(PlanState *pstate)
143 : {
144 1459186 : GatherState *node = castNode(GatherState, pstate);
145 : TupleTableSlot *slot;
146 : ExprContext *econtext;
147 :
2084 148 1459186 : CHECK_FOR_INTERRUPTS();
149 :
150 : /*
151 : * Initialize the parallel context and workers on first execution. We do
152 : * this on first execution rather than during node initialization, as it
153 : * needs to allocate a large dynamic segment, so it is better to do it
154 : * only if it is really needed.
155 : */
2732 rhaas 156 1459186 : if (!node->initialized)
157 : {
2748 158 383 : EState *estate = node->ps.state;
2732 159 383 : Gather *gather = (Gather *) node->ps.plan;
160 :
161 : /*
162 : * Sometimes we might have to run without parallelism; but if parallel
163 : * mode is active then we can try to fire up some workers.
164 : */
1990 165 383 : if (gather->num_workers > 0 && estate->es_use_parallel_mode)
166 : {
167 : ParallelContext *pcxt;
168 :
169 : /* Initialize, or re-initialize, shared state needed by workers. */
2718 170 380 : if (!node->pei)
276 tgl 171 GNC 266 : node->pei = ExecInitParallelPlan(outerPlanState(node),
172 : estate,
173 : gather->initParam,
174 : gather->num_workers,
175 : node->tuples_needed);
176 : else
177 114 : ExecParallelReinitialize(outerPlanState(node),
1970 rhaas 178 CBC 114 : node->pei,
179 : gather->initParam);
180 :
181 : /*
182 : * Register backend workers. We might not get as many as we
183 : * requested, or indeed any at all.
184 : */
2711 185 380 : pcxt = node->pei->pcxt;
186 380 : LaunchParallelWorkers(pcxt);
187 : /* We save # workers launched for the benefit of EXPLAIN */
2550 188 380 : node->nworkers_launched = pcxt->nworkers_launched;
189 :
190 : /* Set up tuple queue readers to read the results. */
2592 191 380 : if (pcxt->nworkers_launched > 0)
192 : {
2033 andres 193 377 : ExecParallelCreateReaders(node->pei);
194 : /* Make a working array showing the active readers */
2046 tgl 195 377 : node->nreaders = pcxt->nworkers_launched;
196 377 : node->reader = (TupleQueueReader **)
197 377 : palloc(node->nreaders * sizeof(TupleQueueReader *));
198 377 : memcpy(node->reader, node->pei->reader,
199 377 : node->nreaders * sizeof(TupleQueueReader *));
200 : }
201 : else
202 : {
203 : /* No workers? Then never mind. */
204 3 : node->nreaders = 0;
205 3 : node->reader = NULL;
206 : }
207 380 : node->nextreader = 0;
208 : }
209 :
210 : /* Run plan locally if no workers or enabled and not single-copy. */
211 766 : node->need_to_scan_locally = (node->nreaders == 0)
1971 rhaas 212 383 : || (!gather->single_copy && parallel_leader_participation);
2732 213 383 : node->initialized = true;
214 : }
215 :
216 : /*
217 : * Reset per-tuple memory context to free any expression evaluation
218 : * storage allocated in the previous tuple cycle.
219 : */
2720 220 1459186 : econtext = node->ps.ps_ExprContext;
221 1459186 : ResetExprContext(econtext);
222 :
223 : /*
224 : * Get next tuple, either from one of our workers, or by running the plan
225 : * ourselves.
226 : */
2268 tgl 227 1459186 : slot = gather_getnext(node);
228 1459183 : if (TupIsNull(slot))
229 380 : return NULL;
230 :
231 : /* If no projection is required, we're done. */
1961 rhaas 232 1458803 : if (node->ps.ps_ProjInfo == NULL)
233 1458803 : return slot;
234 :
235 : /*
236 : * Form the result tuple using ExecProject(), and return it.
237 : */
2268 tgl 238 UBC 0 : econtext->ecxt_outertuple = slot;
239 0 : return ExecProject(node->ps.ps_ProjInfo);
240 : }
241 :
242 : /* ----------------------------------------------------------------
243 : * ExecEndGather
244 : *
245 : * frees any storage allocated through C routines.
246 : * ----------------------------------------------------------------
247 : */
248 : void
2748 rhaas 249 CBC 476 : ExecEndGather(GatherState *node)
250 : {
2153 bruce 251 476 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
2748 rhaas 252 476 : ExecShutdownGather(node);
253 476 : ExecFreeExprContext(&node->ps);
1612 andres 254 476 : if (node->ps.ps_ResultTupleSlot)
255 21 : ExecClearTuple(node->ps.ps_ResultTupleSlot);
2748 rhaas 256 476 : }
257 :
258 : /*
259 : * Read the next tuple. We might fetch a tuple from one of the tuple queues
260 : * using gather_readnext, or if no tuple queue contains a tuple and the
261 : * single_copy flag is not set, we might generate one locally instead.
262 : */
263 : static TupleTableSlot *
264 1459186 : gather_getnext(GatherState *gatherstate)
265 : {
2720 266 1459186 : PlanState *outerPlan = outerPlanState(gatherstate);
267 : TupleTableSlot *outerTupleSlot;
268 1459186 : TupleTableSlot *fslot = gatherstate->funnel_slot;
269 : MinimalTuple tup;
270 :
2046 tgl 271 2918995 : while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
272 : {
2084 andres 273 1459429 : CHECK_FOR_INTERRUPTS();
274 :
2046 tgl 275 1459429 : if (gatherstate->nreaders > 0)
276 : {
2711 rhaas 277 1422416 : tup = gather_readnext(gatherstate);
278 :
2748 279 1422413 : if (HeapTupleIsValid(tup))
280 : {
697 tgl 281 720547 : ExecStoreMinimalTuple(tup, /* tuple to store */
282 : fslot, /* slot to store the tuple */
283 : false); /* don't pfree tuple */
2720 rhaas 284 720547 : return fslot;
285 : }
286 : }
287 :
2748 288 738879 : if (gatherstate->need_to_scan_locally)
289 : {
1809 tgl 290 738539 : EState *estate = gatherstate->ps.state;
291 :
292 : /* Install our DSA area while executing the plan. */
1938 rhaas 293 738539 : estate->es_query_dsa =
294 738539 : gatherstate->pei ? gatherstate->pei->area : NULL;
2748 295 738539 : outerTupleSlot = ExecProcNode(outerPlan);
1938 296 738539 : estate->es_query_dsa = NULL;
297 :
2748 298 738539 : if (!TupIsNull(outerTupleSlot))
299 738256 : return outerTupleSlot;
300 :
301 283 : gatherstate->need_to_scan_locally = false;
302 : }
303 : }
304 :
2720 305 380 : return ExecClearTuple(fslot);
306 : }
307 :
308 : /*
309 : * Attempt to read a tuple from one of our parallel workers.
310 : */
311 : static MinimalTuple
2711 312 1422416 : gather_readnext(GatherState *gatherstate)
313 : {
2445 tgl 314 1422416 : int nvisited = 0;
315 :
316 : for (;;)
2711 rhaas 317 1915896 : {
318 : TupleQueueReader *reader;
319 : MinimalTuple tup;
320 : bool readerdone;
321 :
322 : /* Check for async events, particularly messages from workers. */
2442 tgl 323 3338312 : CHECK_FOR_INTERRUPTS();
324 :
325 : /*
326 : * Attempt to read a tuple, but don't block if none is available.
327 : *
328 : * Note that TupleQueueReaderNext will just return NULL for a worker
329 : * which fails to initialize. We'll treat that worker as having
330 : * produced no tuples; WaitForParallelWorkersToFinish will error out
331 : * when we get there.
332 : */
2316 rhaas 333 3338309 : Assert(gatherstate->nextreader < gatherstate->nreaders);
2711 334 3338309 : reader = gatherstate->reader[gatherstate->nextreader];
335 3338309 : tup = TupleQueueReaderNext(reader, true, &readerdone);
336 :
337 : /*
338 : * If this reader is done, remove it from our working array of active
339 : * readers. If all readers are done, we're outta here.
340 : */
341 3338309 : if (readerdone)
342 : {
2445 tgl 343 1015 : Assert(!tup);
2711 rhaas 344 1015 : --gatherstate->nreaders;
345 1015 : if (gatherstate->nreaders == 0)
346 : {
1710 akapila 347 374 : ExecShutdownGatherWorkers(gatherstate);
2711 rhaas 348 1422413 : return NULL;
349 : }
2445 tgl 350 641 : memmove(&gatherstate->reader[gatherstate->nextreader],
351 641 : &gatherstate->reader[gatherstate->nextreader + 1],
352 : sizeof(TupleQueueReader *)
353 641 : * (gatherstate->nreaders - gatherstate->nextreader));
354 641 : if (gatherstate->nextreader >= gatherstate->nreaders)
355 249 : gatherstate->nextreader = 0;
2711 rhaas 356 641 : continue;
357 : }
358 :
359 : /* If we got a tuple, return it. */
360 3337294 : if (tup)
361 720547 : return tup;
362 :
363 : /*
364 : * Advance nextreader pointer in round-robin fashion. Note that we
365 : * only reach this code if we weren't able to get a tuple from the
366 : * current worker. We used to advance the nextreader pointer after
367 : * every tuple, but it turns out to be much more efficient to keep
368 : * reading from the same queue until that would require blocking.
369 : */
2445 tgl 370 2616747 : gatherstate->nextreader++;
371 2616747 : if (gatherstate->nextreader >= gatherstate->nreaders)
372 706498 : gatherstate->nextreader = 0;
373 :
374 : /* Have we visited every (surviving) TupleQueueReader? */
375 2616747 : nvisited++;
376 2616747 : if (nvisited >= gatherstate->nreaders)
377 : {
378 : /*
379 : * If (still) running plan locally, return NULL so caller can
380 : * generate another tuple from the local copy of the plan.
381 : */
2711 rhaas 382 706233 : if (gatherstate->need_to_scan_locally)
383 701492 : return NULL;
384 :
385 : /* Nothing to do except wait for developments. */
1598 tmunro 386 4741 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
387 : WAIT_EVENT_EXECUTE_GATHER);
2711 rhaas 388 4741 : ResetLatch(MyLatch);
2445 tgl 389 4741 : nvisited = 0;
390 : }
391 : }
392 : }
393 :
394 : /* ----------------------------------------------------------------
395 : * ExecShutdownGatherWorkers
396 : *
397 : * Stop all the parallel workers.
398 : * ----------------------------------------------------------------
399 : */
400 : static void
2718 rhaas 401 1272 : ExecShutdownGatherWorkers(GatherState *node)
402 : {
2732 403 1272 : if (node->pei != NULL)
404 751 : ExecParallelFinish(node->pei);
405 :
406 : /* Flush local copy of reader array */
2046 tgl 407 1272 : if (node->reader)
408 374 : pfree(node->reader);
409 1272 : node->reader = NULL;
2718 rhaas 410 1272 : }
411 :
412 : /* ----------------------------------------------------------------
413 : * ExecShutdownGather
414 : *
415 : * Destroy the setup for parallel workers including parallel context.
416 : * ----------------------------------------------------------------
417 : */
418 : void
419 748 : ExecShutdownGather(GatherState *node)
420 : {
421 748 : ExecShutdownGatherWorkers(node);
422 :
423 : /* Now destroy the parallel context. */
424 748 : if (node->pei != NULL)
425 : {
2732 426 263 : ExecParallelCleanup(node->pei);
427 263 : node->pei = NULL;
428 : }
2748 429 748 : }
430 :
431 : /* ----------------------------------------------------------------
432 : * Join Support
433 : * ----------------------------------------------------------------
434 : */
435 :
436 : /* ----------------------------------------------------------------
437 : * ExecReScanGather
438 : *
439 : * Prepare to re-scan the result of a Gather.
440 : * ----------------------------------------------------------------
441 : */
442 : void
443 150 : ExecReScanGather(GatherState *node)
444 : {
2048 tgl 445 150 : Gather *gather = (Gather *) node->ps.plan;
446 150 : PlanState *outerPlan = outerPlanState(node);
447 :
448 : /* Make sure any existing workers are gracefully shut down */
2718 rhaas 449 150 : ExecShutdownGatherWorkers(node);
450 :
451 : /* Mark node so that shared state will be rebuilt at next call */
2732 452 150 : node->initialized = false;
453 :
454 : /*
455 : * Set child node's chgParam to tell it that the next scan might deliver a
456 : * different set of rows within the leader process. (The overall rowset
457 : * shouldn't change, but the leader process's subset might; hence nodes
458 : * between here and the parallel table scan node mustn't optimize on the
459 : * assumption of an unchanging rowset.)
460 : */
2048 tgl 461 150 : if (gather->rescan_param >= 0)
462 150 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
463 : gather->rescan_param);
464 :
465 : /*
466 : * If chgParam of subnode is not null then plan will be re-scanned by
467 : * first ExecProcNode. Note: because this does nothing if we have a
468 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
469 : * will not see a ReScan call until after they get a ReInitializeDSM call.
470 : * That ordering might not be something to rely on, though. A good rule
471 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
472 : * should reset only local state, and anything that depends on both of
473 : * those steps being finished must wait until the first ExecProcNode call.
474 : */
475 150 : if (outerPlan->chgParam == NULL)
2048 tgl 476 UBC 0 : ExecReScan(outerPlan);
2748 rhaas 477 CBC 150 : }
|