Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeGather.c
4 : : * Support routines for scanning a plan via multiple workers.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * A Gather executor launches parallel workers to run multiple copies of a
10 : : * plan. It can also run the plan itself, if the workers are not available
11 : : * or have not started up yet. It then merges all of the results it produces
12 : : * and the results from the workers into a single output stream. Therefore,
13 : : * it will normally be used with a plan where running multiple copies of the
14 : : * same plan does not produce duplicate output, such as parallel-aware
15 : : * SeqScan.
16 : : *
17 : : * Alternatively, a Gather node can be configured to use just one worker
18 : : * and the single-copy flag can be set. In this case, the Gather node will
19 : : * run the plan in one worker and will not execute the plan itself. In
20 : : * this case, it simply returns whatever tuples were returned by the worker.
21 : : * If a worker cannot be obtained, then it will run the plan itself and
22 : : * return the results. Therefore, a plan used with a single-copy Gather
23 : : * node need not be parallel-aware.
24 : : *
25 : : * IDENTIFICATION
26 : : * src/backend/executor/nodeGather.c
27 : : *
28 : : *-------------------------------------------------------------------------
29 : : */
30 : :
31 : : #include "postgres.h"
32 : :
33 : : #include "executor/execParallel.h"
34 : : #include "executor/executor.h"
35 : : #include "executor/nodeGather.h"
36 : : #include "executor/tqueue.h"
37 : : #include "miscadmin.h"
38 : : #include "optimizer/optimizer.h"
39 : : #include "utils/wait_event.h"
40 : :
41 : :
42 : : static TupleTableSlot *ExecGather(PlanState *pstate);
43 : : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
44 : : static MinimalTuple gather_readnext(GatherState *gatherstate);
45 : : static void ExecShutdownGatherWorkers(GatherState *node);
46 : :
47 : :
48 : : /* ----------------------------------------------------------------
49 : : * ExecInitGather
50 : : * ----------------------------------------------------------------
51 : : */
52 : : GatherState *
3119 rhaas@postgresql.org 53 :CBC 485 : ExecInitGather(Gather *node, EState *estate, int eflags)
54 : : {
55 : : GatherState *gatherstate;
56 : : Plan *outerNode;
57 : : TupleDesc tupDesc;
58 : :
59 : : /* Gather node doesn't have innerPlan node. */
60 [ - + ]: 485 : Assert(innerPlan(node) == NULL);
61 : :
62 : : /*
63 : : * create state structure
64 : : */
65 : 485 : gatherstate = makeNode(GatherState);
66 : 485 : gatherstate->ps.plan = (Plan *) node;
67 : 485 : gatherstate->ps.state = estate;
2463 andres@anarazel.de 68 : 485 : gatherstate->ps.ExecProcNode = ExecGather;
69 : :
2419 tgl@sss.pgh.pa.us 70 : 485 : gatherstate->initialized = false;
2342 rhaas@postgresql.org 71 : 485 : gatherstate->need_to_scan_locally =
72 [ + + + + ]: 485 : !node->single_copy && parallel_leader_participation;
2420 73 : 485 : gatherstate->tuples_needed = -1;
74 : :
75 : : /*
76 : : * Miscellaneous initialization
77 : : *
78 : : * create expression context for node
79 : : */
3119 80 : 485 : ExecAssignExprContext(estate, &gatherstate->ps);
81 : :
82 : : /*
83 : : * now initialize outer plan
84 : : */
3091 85 : 485 : outerNode = outerPlan(node);
86 : 485 : outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
2249 andres@anarazel.de 87 : 485 : tupDesc = ExecGetResultType(outerPlanState(gatherstate));
88 : :
89 : : /*
90 : : * Leader may access ExecProcNode result directly (if
91 : : * need_to_scan_locally), or from workers via tuple queue. So we can't
92 : : * trivially rely on the slot type being fixed for expressions evaluated
93 : : * within this node.
94 : : */
1977 95 : 485 : gatherstate->ps.outeropsset = true;
96 : 485 : gatherstate->ps.outeropsfixed = false;
97 : :
98 : : /*
99 : : * Initialize result type and projection.
100 : : */
1983 101 : 485 : ExecInitResultTypeTL(&gatherstate->ps);
2249 102 : 485 : ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
103 : :
104 : : /*
105 : : * Without projections result slot type is not trivially known, see
106 : : * comment above.
107 : : */
1977 108 [ + + ]: 485 : if (gatherstate->ps.ps_ProjInfo == NULL)
109 : : {
110 : 464 : gatherstate->ps.resultopsset = true;
111 : 464 : gatherstate->ps.resultopsfixed = false;
112 : : }
113 : :
114 : : /*
115 : : * Initialize funnel slot to same tuple descriptor as outer plan.
116 : : */
117 : 485 : gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
118 : : &TTSOpsMinimalTuple);
119 : :
120 : : /*
121 : : * Gather doesn't support checking a qual (it's always more efficient to
122 : : * do it in the child node).
123 : : */
2249 124 [ - + ]: 485 : Assert(!node->plan.qual);
125 : :
3119 rhaas@postgresql.org 126 : 485 : return gatherstate;
127 : : }
128 : :
129 : : /* ----------------------------------------------------------------
130 : : * ExecGather(node)
131 : : *
132 : : * Scans the relation via multiple workers and returns
133 : : * the next qualifying tuple.
134 : : * ----------------------------------------------------------------
135 : : */
136 : : static TupleTableSlot *
2463 andres@anarazel.de 137 : 1462568 : ExecGather(PlanState *pstate)
138 : : {
139 : 1462568 : GatherState *node = castNode(GatherState, pstate);
140 : : TupleTableSlot *slot;
141 : : ExprContext *econtext;
142 : :
2455 143 [ + + ]: 1462568 : CHECK_FOR_INTERRUPTS();
144 : :
145 : : /*
146 : : * Initialize the parallel context and workers on first execution. We do
147 : : * this on first execution rather than during node initialization, as it
148 : : * needs to allocate a large dynamic segment, so it is better to do it
149 : : * only if it is really needed.
150 : : */
3103 rhaas@postgresql.org 151 [ + + ]: 1462568 : if (!node->initialized)
152 : : {
3119 153 : 386 : EState *estate = node->ps.state;
3103 154 : 386 : Gather *gather = (Gather *) node->ps.plan;
155 : :
156 : : /*
157 : : * Sometimes we might have to run without parallelism; but if parallel
158 : : * mode is active then we can try to fire up some workers.
159 : : */
2361 160 [ + - + + ]: 386 : if (gather->num_workers > 0 && estate->es_use_parallel_mode)
161 : : {
162 : : ParallelContext *pcxt;
163 : :
164 : : /* Initialize, or re-initialize, shared state needed by workers. */
3089 165 [ + + ]: 383 : if (!node->pei)
647 tgl@sss.pgh.pa.us 166 : 269 : node->pei = ExecInitParallelPlan(outerPlanState(node),
167 : : estate,
168 : : gather->initParam,
169 : : gather->num_workers,
170 : : node->tuples_needed);
171 : : else
172 : 114 : ExecParallelReinitialize(outerPlanState(node),
2341 rhaas@postgresql.org 173 : 114 : node->pei,
174 : : gather->initParam);
175 : :
176 : : /*
177 : : * Register backend workers. We might not get as many as we
178 : : * requested, or indeed any at all.
179 : : */
3082 180 : 383 : pcxt = node->pei->pcxt;
181 : 383 : LaunchParallelWorkers(pcxt);
182 : : /* We save # workers launched for the benefit of EXPLAIN */
2921 183 : 383 : node->nworkers_launched = pcxt->nworkers_launched;
184 : :
185 : : /* Set up tuple queue readers to read the results. */
2963 186 [ + + ]: 383 : if (pcxt->nworkers_launched > 0)
187 : : {
2404 andres@anarazel.de 188 : 380 : ExecParallelCreateReaders(node->pei);
189 : : /* Make a working array showing the active readers */
2417 tgl@sss.pgh.pa.us 190 : 380 : node->nreaders = pcxt->nworkers_launched;
191 : 380 : node->reader = (TupleQueueReader **)
192 : 380 : palloc(node->nreaders * sizeof(TupleQueueReader *));
193 : 380 : memcpy(node->reader, node->pei->reader,
194 : 380 : node->nreaders * sizeof(TupleQueueReader *));
195 : : }
196 : : else
197 : : {
198 : : /* No workers? Then never mind. */
199 : 3 : node->nreaders = 0;
200 : 3 : node->reader = NULL;
201 : : }
202 : 383 : node->nextreader = 0;
203 : : }
204 : :
205 : : /* Run plan locally if no workers or enabled and not single-copy. */
206 : 772 : node->need_to_scan_locally = (node->nreaders == 0)
2342 rhaas@postgresql.org 207 [ + + + + : 386 : || (!gather->single_copy && parallel_leader_participation);
+ + ]
3103 208 : 386 : node->initialized = true;
209 : : }
210 : :
211 : : /*
212 : : * Reset per-tuple memory context to free any expression evaluation
213 : : * storage allocated in the previous tuple cycle.
214 : : */
3091 215 : 1462568 : econtext = node->ps.ps_ExprContext;
216 : 1462568 : ResetExprContext(econtext);
217 : :
218 : : /*
219 : : * Get next tuple, either from one of our workers, or by running the plan
220 : : * ourselves.
221 : : */
2639 tgl@sss.pgh.pa.us 222 : 1462568 : slot = gather_getnext(node);
223 [ + - + + ]: 1462565 : if (TupIsNull(slot))
224 : 383 : return NULL;
225 : :
226 : : /* If no projection is required, we're done. */
2332 rhaas@postgresql.org 227 [ + - ]: 1462182 : if (node->ps.ps_ProjInfo == NULL)
228 : 1462182 : return slot;
229 : :
230 : : /*
231 : : * Form the result tuple using ExecProject(), and return it.
232 : : */
2639 tgl@sss.pgh.pa.us 233 :UBC 0 : econtext->ecxt_outertuple = slot;
234 : 0 : return ExecProject(node->ps.ps_ProjInfo);
235 : : }
236 : :
237 : : /* ----------------------------------------------------------------
238 : : * ExecEndGather
239 : : *
240 : : * frees any storage allocated through C routines.
241 : : * ----------------------------------------------------------------
242 : : */
243 : : void
3119 rhaas@postgresql.org 244 :CBC 482 : ExecEndGather(GatherState *node)
245 : : {
2524 bruce@momjian.us 246 : 482 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
3119 rhaas@postgresql.org 247 : 482 : ExecShutdownGather(node);
248 : 482 : }
249 : :
250 : : /*
251 : : * Read the next tuple. We might fetch a tuple from one of the tuple queues
252 : : * using gather_readnext, or if no tuple queue contains a tuple and the
253 : : * single_copy flag is not set, we might generate one locally instead.
254 : : */
255 : : static TupleTableSlot *
256 : 1462568 : gather_getnext(GatherState *gatherstate)
257 : : {
3091 258 : 1462568 : PlanState *outerPlan = outerPlanState(gatherstate);
259 : : TupleTableSlot *outerTupleSlot;
260 : 1462568 : TupleTableSlot *fslot = gatherstate->funnel_slot;
261 : : MinimalTuple tup;
262 : :
2417 tgl@sss.pgh.pa.us 263 [ + + + + ]: 2925676 : while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
264 : : {
2455 andres@anarazel.de 265 [ + + ]: 1462725 : CHECK_FOR_INTERRUPTS();
266 : :
2417 tgl@sss.pgh.pa.us 267 [ + + ]: 1462725 : if (gatherstate->nreaders > 0)
268 : : {
3082 rhaas@postgresql.org 269 : 1375328 : tup = gather_readnext(gatherstate);
270 : :
3119 271 [ + + ]: 1375325 : if (HeapTupleIsValid(tup))
272 : : {
1068 tgl@sss.pgh.pa.us 273 : 1151760 : ExecStoreMinimalTuple(tup, /* tuple to store */
274 : : fslot, /* slot to store the tuple */
275 : : false); /* don't pfree tuple */
3091 rhaas@postgresql.org 276 : 1151760 : return fslot;
277 : : }
278 : : }
279 : :
3119 280 [ + + ]: 310962 : if (gatherstate->need_to_scan_locally)
281 : : {
2180 tgl@sss.pgh.pa.us 282 : 310705 : EState *estate = gatherstate->ps.state;
283 : :
284 : : /* Install our DSA area while executing the plan. */
2309 rhaas@postgresql.org 285 : 310705 : estate->es_query_dsa =
286 [ + + ]: 310705 : gatherstate->pei ? gatherstate->pei->area : NULL;
3119 287 : 310705 : outerTupleSlot = ExecProcNode(outerPlan);
2309 288 : 310705 : estate->es_query_dsa = NULL;
289 : :
3119 290 [ + + + + ]: 310705 : if (!TupIsNull(outerTupleSlot))
291 : 310422 : return outerTupleSlot;
292 : :
293 : 283 : gatherstate->need_to_scan_locally = false;
294 : : }
295 : : }
296 : :
3091 297 : 383 : return ExecClearTuple(fslot);
298 : : }
299 : :
300 : : /*
301 : : * Attempt to read a tuple from one of our parallel workers.
302 : : */
303 : : static MinimalTuple
3082 304 : 1375328 : gather_readnext(GatherState *gatherstate)
305 : : {
2816 tgl@sss.pgh.pa.us 306 : 1375328 : int nvisited = 0;
307 : :
308 : : for (;;)
3082 rhaas@postgresql.org 309 : 562202 : {
310 : : TupleQueueReader *reader;
311 : : MinimalTuple tup;
312 : : bool readerdone;
313 : :
314 : : /* Check for async events, particularly messages from workers. */
2813 tgl@sss.pgh.pa.us 315 [ + + ]: 1937530 : CHECK_FOR_INTERRUPTS();
316 : :
317 : : /*
318 : : * Attempt to read a tuple, but don't block if none is available.
319 : : *
320 : : * Note that TupleQueueReaderNext will just return NULL for a worker
321 : : * which fails to initialize. We'll treat that worker as having
322 : : * produced no tuples; WaitForParallelWorkersToFinish will error out
323 : : * when we get there.
324 : : */
2687 rhaas@postgresql.org 325 [ - + ]: 1937527 : Assert(gatherstate->nextreader < gatherstate->nreaders);
3082 326 : 1937527 : reader = gatherstate->reader[gatherstate->nextreader];
327 : 1937527 : tup = TupleQueueReaderNext(reader, true, &readerdone);
328 : :
329 : : /*
330 : : * If this reader is done, remove it from our working array of active
331 : : * readers. If all readers are done, we're outta here.
332 : : */
333 [ + + ]: 1937527 : if (readerdone)
334 : : {
2816 tgl@sss.pgh.pa.us 335 [ - + ]: 1025 : Assert(!tup);
3082 rhaas@postgresql.org 336 : 1025 : --gatherstate->nreaders;
337 [ + + ]: 1025 : if (gatherstate->nreaders == 0)
338 : : {
2081 akapila@postgresql.o 339 : 377 : ExecShutdownGatherWorkers(gatherstate);
3082 rhaas@postgresql.org 340 : 1375325 : return NULL;
341 : : }
2816 tgl@sss.pgh.pa.us 342 : 648 : memmove(&gatherstate->reader[gatherstate->nextreader],
343 : 648 : &gatherstate->reader[gatherstate->nextreader + 1],
344 : : sizeof(TupleQueueReader *)
345 : 648 : * (gatherstate->nreaders - gatherstate->nextreader));
346 [ + + ]: 648 : if (gatherstate->nextreader >= gatherstate->nreaders)
347 : 193 : gatherstate->nextreader = 0;
3082 rhaas@postgresql.org 348 : 648 : continue;
349 : : }
350 : :
351 : : /* If we got a tuple, return it. */
352 [ + + ]: 1936502 : if (tup)
353 : 1151760 : return tup;
354 : :
355 : : /*
356 : : * Advance nextreader pointer in round-robin fashion. Note that we
357 : : * only reach this code if we weren't able to get a tuple from the
358 : : * current worker. We used to advance the nextreader pointer after
359 : : * every tuple, but it turns out to be much more efficient to keep
360 : : * reading from the same queue until that would require blocking.
361 : : */
2816 tgl@sss.pgh.pa.us 362 : 784742 : gatherstate->nextreader++;
363 [ + + ]: 784742 : if (gatherstate->nextreader >= gatherstate->nreaders)
364 : 226107 : gatherstate->nextreader = 0;
365 : :
366 : : /* Have we visited every (surviving) TupleQueueReader? */
367 : 784742 : nvisited++;
368 [ + + ]: 784742 : if (nvisited >= gatherstate->nreaders)
369 : : {
370 : : /*
371 : : * If (still) running plan locally, return NULL so caller can
372 : : * generate another tuple from the local copy of the plan.
373 : : */
3082 rhaas@postgresql.org 374 [ + + ]: 225668 : if (gatherstate->need_to_scan_locally)
375 : 223188 : return NULL;
376 : :
377 : : /* Nothing to do except wait for developments. */
1969 tmunro@postgresql.or 378 : 2480 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
379 : : WAIT_EVENT_EXECUTE_GATHER);
3082 rhaas@postgresql.org 380 : 2480 : ResetLatch(MyLatch);
2816 tgl@sss.pgh.pa.us 381 : 2480 : nvisited = 0;
382 : : }
383 : : }
384 : : }
385 : :
386 : : /* ----------------------------------------------------------------
387 : : * ExecShutdownGatherWorkers
388 : : *
389 : : * Stop all the parallel workers.
390 : : * ----------------------------------------------------------------
391 : : */
392 : : static void
3089 rhaas@postgresql.org 393 : 1284 : ExecShutdownGatherWorkers(GatherState *node)
394 : : {
3103 395 [ + + ]: 1284 : if (node->pei != NULL)
396 : 757 : ExecParallelFinish(node->pei);
397 : :
398 : : /* Flush local copy of reader array */
2417 tgl@sss.pgh.pa.us 399 [ + + ]: 1284 : if (node->reader)
400 : 377 : pfree(node->reader);
401 : 1284 : node->reader = NULL;
3089 rhaas@postgresql.org 402 : 1284 : }
403 : :
404 : : /* ----------------------------------------------------------------
405 : : * ExecShutdownGather
406 : : *
407 : : * Destroy the setup for parallel workers including parallel context.
408 : : * ----------------------------------------------------------------
409 : : */
410 : : void
411 : 757 : ExecShutdownGather(GatherState *node)
412 : : {
413 : 757 : ExecShutdownGatherWorkers(node);
414 : :
415 : : /* Now destroy the parallel context. */
416 [ + + ]: 757 : if (node->pei != NULL)
417 : : {
3103 418 : 266 : ExecParallelCleanup(node->pei);
419 : 266 : node->pei = NULL;
420 : : }
3119 421 : 757 : }
422 : :
423 : : /* ----------------------------------------------------------------
424 : : * Join Support
425 : : * ----------------------------------------------------------------
426 : : */
427 : :
428 : : /* ----------------------------------------------------------------
429 : : * ExecReScanGather
430 : : *
431 : : * Prepare to re-scan the result of a Gather.
432 : : * ----------------------------------------------------------------
433 : : */
434 : : void
435 : 150 : ExecReScanGather(GatherState *node)
436 : : {
2419 tgl@sss.pgh.pa.us 437 : 150 : Gather *gather = (Gather *) node->ps.plan;
438 : 150 : PlanState *outerPlan = outerPlanState(node);
439 : :
440 : : /* Make sure any existing workers are gracefully shut down */
3089 rhaas@postgresql.org 441 : 150 : ExecShutdownGatherWorkers(node);
442 : :
443 : : /* Mark node so that shared state will be rebuilt at next call */
3103 444 : 150 : node->initialized = false;
445 : :
446 : : /*
447 : : * Set child node's chgParam to tell it that the next scan might deliver a
448 : : * different set of rows within the leader process. (The overall rowset
449 : : * shouldn't change, but the leader process's subset might; hence nodes
450 : : * between here and the parallel table scan node mustn't optimize on the
451 : : * assumption of an unchanging rowset.)
452 : : */
2419 tgl@sss.pgh.pa.us 453 [ + - ]: 150 : if (gather->rescan_param >= 0)
454 : 150 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
455 : : gather->rescan_param);
456 : :
457 : : /*
458 : : * If chgParam of subnode is not null then plan will be re-scanned by
459 : : * first ExecProcNode. Note: because this does nothing if we have a
460 : : * rescan_param, it's currently guaranteed that parallel-aware child nodes
461 : : * will not see a ReScan call until after they get a ReInitializeDSM call.
462 : : * That ordering might not be something to rely on, though. A good rule
463 : : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
464 : : * should reset only local state, and anything that depends on both of
465 : : * those steps being finished must wait until the first ExecProcNode call.
466 : : */
467 [ - + ]: 150 : if (outerPlan->chgParam == NULL)
2419 tgl@sss.pgh.pa.us 468 :UBC 0 : ExecReScan(outerPlan);
3119 rhaas@postgresql.org 469 :CBC 150 : }
|