TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitAppend - initialize the append node
17 : * ExecAppend - retrieve the next tuple from the node
18 : * ExecEndAppend - shut down the append node
19 : * ExecReScanAppend - rescan the append node
20 : *
21 : * NOTES
22 : * Each append node contains a list of one or more subplans which
23 : * must be iteratively processed (forwards or backwards).
24 : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : * until the subplan stops returning tuples, at which point that
26 : * plan is shut down and the next started up.
27 : *
28 : * Append nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical append node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * Append -------+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : *
39 : * Append nodes are currently used for unions, and to support
40 : * inheritance queries, where several relations need to be scanned.
41 : * For example, in our standard person/student/employee/student-emp
42 : * example, where student and employee inherit from person
43 : * and student-emp inherits from student and employee, the
44 : * query:
45 : *
46 : * select name from person
47 : *
48 : * generates the plan:
49 : *
50 : * |
51 : * Append -------+-------+--------+--------+
52 : * / \ | | | |
53 : * nil nil Scan Scan Scan Scan
54 : * | | | |
55 : * person employee student student-emp
56 : */
57 :
58 : #include "postgres.h"
59 :
60 : #include "executor/execAsync.h"
61 : #include "executor/execdebug.h"
62 : #include "executor/execPartition.h"
63 : #include "executor/nodeAppend.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "storage/latch.h"
67 :
68 : /* Shared state for parallel-aware Append. */
69 : struct ParallelAppendState
70 : {
71 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : int pa_next_plan; /* next plan to choose by any worker */
73 :
74 : /*
75 : * pa_finished[i] should be true if no more workers should select subplan
76 : * i. for a non-partial plan, this should be set to true as soon as a
77 : * worker selects the plan; for a partial plan, it remains false until
78 : * some worker executes the plan to completion.
79 : */
80 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : };
82 :
83 : #define INVALID_SUBPLAN_INDEX -1
84 : #define EVENT_BUFFER_SIZE 16
85 :
86 : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : static bool choose_next_subplan_locally(AppendState *node);
88 : static bool choose_next_subplan_for_leader(AppendState *node);
89 : static bool choose_next_subplan_for_worker(AppendState *node);
90 : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : static void ExecAppendAsyncBegin(AppendState *node);
92 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : static void ExecAppendAsyncEventWait(AppendState *node);
95 : static void classify_matching_subplans(AppendState *node);
96 :
97 : /* ----------------------------------------------------------------
98 : * ExecInitAppend
99 : *
100 : * Begin all of the subscans of the append node.
101 : *
102 : * (This is potentially wasteful, since the entire result of the
103 : * append node may not be scanned, but this way all of the
104 : * structures get allocated in the executor's top level memory
105 : * block instead of that of the call to ExecAppend.)
106 : * ----------------------------------------------------------------
107 : */
108 : AppendState *
109 CBC 6518 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : {
111 6518 : AppendState *appendstate = makeNode(AppendState);
112 : PlanState **appendplanstates;
113 : Bitmapset *validsubplans;
114 : Bitmapset *asyncplans;
115 : int nplans;
116 : int nasyncplans;
117 : int firstvalid;
118 : int i,
119 : j;
120 :
121 : /* check for unsupported flags */
122 6518 : Assert(!(eflags & EXEC_FLAG_MARK));
123 :
124 : /*
125 : * create new AppendState for our append node
126 : */
127 6518 : appendstate->ps.plan = (Plan *) node;
128 6518 : appendstate->ps.state = estate;
129 6518 : appendstate->ps.ExecProcNode = ExecAppend;
130 :
131 : /* Let choose_next_subplan_* function handle setting the first subplan */
132 6518 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133 6518 : appendstate->as_syncdone = false;
134 6518 : appendstate->as_begun = false;
135 :
136 : /* If run-time partition pruning is enabled, then set that up now */
137 GNC 6518 : if (node->part_prune_index >= 0)
138 : {
139 : PartitionPruneState *prunestate;
140 :
141 : /*
142 : * Set up pruning data structure. This also initializes the set of
143 : * subplans to initialize (validsubplans) by taking into account the
144 : * result of performing initial pruning if any.
145 : */
146 CBC 275 : prunestate = ExecInitPartitionPruning(&appendstate->ps,
147 275 : list_length(node->appendplans),
148 : node->part_prune_index,
149 : node->apprelids,
150 : &validsubplans);
151 GIC 275 : appendstate->as_prune_state = prunestate;
152 CBC 275 : nplans = bms_num_members(validsubplans);
153 ECB :
154 : /*
155 : * When no run-time pruning is required and there's at least one
156 : * subplan, we can fill as_valid_subplans immediately, preventing
157 : * later calls to ExecFindMatchingSubPlans.
158 : */
159 GIC 275 : if (!prunestate->do_exec_prune && nplans > 0)
160 : {
161 CBC 70 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
162 GNC 70 : appendstate->as_valid_subplans_identified = true;
163 : }
164 : }
165 ECB : else
166 : {
167 GIC 6243 : nplans = list_length(node->appendplans);
168 :
169 : /*
170 : * When run-time partition pruning is not enabled we can just mark all
171 ECB : * subplans as valid; they must also all be initialized.
172 : */
173 GIC 6243 : Assert(nplans > 0);
174 6243 : appendstate->as_valid_subplans = validsubplans =
175 6243 : bms_add_range(NULL, 0, nplans - 1);
176 GNC 6243 : appendstate->as_valid_subplans_identified = true;
177 GIC 6243 : appendstate->as_prune_state = NULL;
178 ECB : }
179 :
180 : /*
181 : * Initialize result tuple type and slot.
182 : */
183 GIC 6518 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
184 :
185 : /* node returns slots from each of its subnodes, therefore not fixed */
186 6518 : appendstate->ps.resultopsset = true;
187 6518 : appendstate->ps.resultopsfixed = false;
188 ECB :
189 GIC 6518 : appendplanstates = (PlanState **) palloc(nplans *
190 : sizeof(PlanState *));
191 ECB :
192 : /*
193 : * call ExecInitNode on each of the valid plans to be executed and save
194 : * the results into the appendplanstates array.
195 : *
196 : * While at it, find out the first valid partial plan.
197 : */
198 GIC 6518 : j = 0;
199 6518 : asyncplans = NULL;
200 6518 : nasyncplans = 0;
201 6518 : firstvalid = nplans;
202 6518 : i = -1;
203 CBC 25418 : while ((i = bms_next_member(validsubplans, i)) >= 0)
204 ECB : {
205 CBC 18900 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
206 ECB :
207 : /*
208 : * Record async subplans. When executing EvalPlanQual, we treat them
209 : * as sync ones; don't do this when initializing an EvalPlanQual plan
210 : * tree.
211 : */
212 GIC 18900 : if (initNode->async_capable && estate->es_epq_active == NULL)
213 : {
214 90 : asyncplans = bms_add_member(asyncplans, j);
215 90 : nasyncplans++;
216 : }
217 ECB :
218 : /*
219 : * Record the lowest appendplans index which is a valid partial plan.
220 : */
221 GIC 18900 : if (i >= node->first_partial_plan && j < firstvalid)
222 221 : firstvalid = j;
223 :
224 18900 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
225 : }
226 ECB :
227 CBC 6518 : appendstate->as_first_partial_plan = firstvalid;
228 GIC 6518 : appendstate->appendplans = appendplanstates;
229 CBC 6518 : appendstate->as_nplans = nplans;
230 :
231 : /* Initialize async state */
232 6518 : appendstate->as_asyncplans = asyncplans;
233 6518 : appendstate->as_nasyncplans = nasyncplans;
234 6518 : appendstate->as_asyncrequests = NULL;
235 GIC 6518 : appendstate->as_asyncresults = NULL;
236 6518 : appendstate->as_nasyncresults = 0;
237 CBC 6518 : appendstate->as_nasyncremain = 0;
238 6518 : appendstate->as_needrequest = NULL;
239 6518 : appendstate->as_eventset = NULL;
240 6518 : appendstate->as_valid_asyncplans = NULL;
241 ECB :
242 CBC 6518 : if (nasyncplans > 0)
243 ECB : {
244 CBC 46 : appendstate->as_asyncrequests = (AsyncRequest **)
245 46 : palloc0(nplans * sizeof(AsyncRequest *));
246 :
247 46 : i = -1;
248 GIC 136 : while ((i = bms_next_member(asyncplans, i)) >= 0)
249 ECB : {
250 : AsyncRequest *areq;
251 :
252 CBC 90 : areq = palloc(sizeof(AsyncRequest));
253 90 : areq->requestor = (PlanState *) appendstate;
254 GIC 90 : areq->requestee = appendplanstates[i];
255 90 : areq->request_index = i;
256 90 : areq->callback_pending = false;
257 CBC 90 : areq->request_complete = false;
258 90 : areq->result = NULL;
259 ECB :
260 CBC 90 : appendstate->as_asyncrequests[i] = areq;
261 ECB : }
262 :
263 CBC 46 : appendstate->as_asyncresults = (TupleTableSlot **)
264 GIC 46 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
265 ECB :
266 GNC 46 : if (appendstate->as_valid_subplans_identified)
267 GIC 43 : classify_matching_subplans(appendstate);
268 ECB : }
269 :
270 : /*
271 : * Miscellaneous initialization
272 : */
273 :
274 GIC 6518 : appendstate->ps.ps_ProjInfo = NULL;
275 :
276 : /* For parallel query, this will be overridden later. */
277 6518 : appendstate->choose_next_subplan = choose_next_subplan_locally;
278 :
279 CBC 6518 : return appendstate;
280 : }
281 :
282 ECB : /* ----------------------------------------------------------------
283 : * ExecAppend
284 : *
285 : * Handles iteration over multiple subplans.
286 : * ----------------------------------------------------------------
287 : */
288 : static TupleTableSlot *
289 GIC 1774005 : ExecAppend(PlanState *pstate)
290 : {
291 1774005 : AppendState *node = castNode(AppendState, pstate);
292 : TupleTableSlot *result;
293 :
294 ECB : /*
295 : * If this is the first call after Init or ReScan, we need to do the
296 : * initialization work.
297 : */
298 GIC 1774005 : if (!node->as_begun)
299 : {
300 46334 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
301 46334 : Assert(!node->as_syncdone);
302 :
303 ECB : /* Nothing to do if there are no subplans */
304 GIC 46334 : if (node->as_nplans == 0)
305 CBC 18 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
306 ECB :
307 : /* If there are any async subplans, begin executing them. */
308 GIC 46316 : if (node->as_nasyncplans > 0)
309 CBC 36 : ExecAppendAsyncBegin(node);
310 ECB :
311 : /*
312 : * If no sync subplan has been chosen, we must choose one before
313 : * proceeding.
314 : */
315 GIC 46316 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
316 1610 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
317 :
318 44706 : Assert(node->as_syncdone ||
319 : (node->as_whichplan >= 0 &&
320 ECB : node->as_whichplan < node->as_nplans));
321 :
322 : /* And we're initialized. */
323 CBC 44706 : node->as_begun = true;
324 : }
325 :
326 : for (;;)
327 GIC 48084 : {
328 ECB : PlanState *subnode;
329 :
330 GIC 1820461 : CHECK_FOR_INTERRUPTS();
331 :
332 ECB : /*
333 : * try to get a tuple from an async subplan if any
334 : */
335 CBC 1820461 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
336 : {
337 GIC 5736 : if (ExecAppendAsyncGetNext(node, &result))
338 5736 : return result;
339 UIC 0 : Assert(!node->as_syncdone);
340 LBC 0 : Assert(bms_is_empty(node->as_needrequest));
341 : }
342 ECB :
343 : /*
344 EUB : * figure out which sync subplan we are currently processing
345 : */
346 GIC 1814725 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
347 1814725 : subnode = node->appendplans[node->as_whichplan];
348 :
349 : /*
350 : * get a tuple from the subplan
351 ECB : */
352 CBC 1814725 : result = ExecProcNode(subnode);
353 :
354 GIC 1814699 : if (!TupIsNull(result))
355 : {
356 : /*
357 ECB : * If the subplan gave us something then return it as-is. We do
358 : * NOT make use of the result slot that was set up in
359 : * ExecInitAppend; there's no need for it.
360 : */
361 GIC 1722185 : return result;
362 : }
363 :
364 : /*
365 : * wait or poll for async events if any. We do this before checking
366 ECB : * for the end of iteration, because it might drain the remaining
367 : * async subplans.
368 : */
369 GIC 92514 : if (node->as_nasyncremain > 0)
370 17 : ExecAppendAsyncEventWait(node);
371 :
372 : /* choose new sync subplan; if no sync/async subplans, we're done */
373 92514 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
374 CBC 44430 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
375 ECB : }
376 : }
377 :
378 : /* ----------------------------------------------------------------
379 : * ExecEndAppend
380 : *
381 : * Shuts down the subscans of the append node.
382 : *
383 : * Returns nothing of interest.
384 : * ----------------------------------------------------------------
385 : */
386 : void
387 GIC 6379 : ExecEndAppend(AppendState *node)
388 : {
389 : PlanState **appendplans;
390 : int nplans;
391 : int i;
392 ECB :
393 : /*
394 : * get information from the node
395 : */
396 GIC 6379 : appendplans = node->appendplans;
397 6379 : nplans = node->as_nplans;
398 :
399 : /*
400 : * shut down each of the subscans
401 ECB : */
402 CBC 24956 : for (i = 0; i < nplans; i++)
403 GIC 18577 : ExecEndNode(appendplans[i]);
404 6379 : }
405 :
406 : void
407 CBC 41962 : ExecReScanAppend(AppendState *node)
408 ECB : {
409 CBC 41962 : int nasyncplans = node->as_nasyncplans;
410 : int i;
411 :
412 ECB : /*
413 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
414 : * we'd better unset the valid subplans so that they are reselected for
415 : * the new parameter values.
416 : */
417 GIC 43596 : if (node->as_prune_state &&
418 1634 : bms_overlap(node->ps.chgParam,
419 1634 : node->as_prune_state->execparamids))
420 : {
421 GNC 1634 : node->as_valid_subplans_identified = false;
422 GIC 1634 : bms_free(node->as_valid_subplans);
423 CBC 1634 : node->as_valid_subplans = NULL;
424 GNC 1634 : bms_free(node->as_valid_asyncplans);
425 1634 : node->as_valid_asyncplans = NULL;
426 ECB : }
427 :
428 CBC 137173 : for (i = 0; i < node->as_nplans; i++)
429 : {
430 GIC 95211 : PlanState *subnode = node->appendplans[i];
431 ECB :
432 : /*
433 : * ExecReScan doesn't know about my subplans, so I have to do
434 : * changed-parameter signaling myself.
435 : */
436 GIC 95211 : if (node->ps.chgParam != NULL)
437 92940 : UpdateChangedParamSet(subnode, node->ps.chgParam);
438 :
439 ECB : /*
440 : * If chgParam of subnode is not null then plan will be re-scanned by
441 : * first ExecProcNode or by first ExecAsyncRequest.
442 : */
443 GIC 95211 : if (subnode->chgParam == NULL)
444 40479 : ExecReScan(subnode);
445 : }
446 ECB :
447 : /* Reset async state */
448 GIC 41962 : if (nasyncplans > 0)
449 : {
450 17 : i = -1;
451 CBC 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
452 : {
453 34 : AsyncRequest *areq = node->as_asyncrequests[i];
454 ECB :
455 GIC 34 : areq->callback_pending = false;
456 CBC 34 : areq->request_complete = false;
457 GIC 34 : areq->result = NULL;
458 ECB : }
459 :
460 CBC 17 : node->as_nasyncresults = 0;
461 GIC 17 : node->as_nasyncremain = 0;
462 17 : bms_free(node->as_needrequest);
463 CBC 17 : node->as_needrequest = NULL;
464 ECB : }
465 :
466 : /* Let choose_next_subplan_* function handle setting the first subplan */
467 GIC 41962 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
468 41962 : node->as_syncdone = false;
469 41962 : node->as_begun = false;
470 CBC 41962 : }
471 ECB :
472 : /* ----------------------------------------------------------------
473 : * Parallel Append Support
474 : * ----------------------------------------------------------------
475 : */
476 :
477 : /* ----------------------------------------------------------------
478 : * ExecAppendEstimate
479 : *
480 : * Compute the amount of space we'll need in the parallel
481 : * query DSM, and inform pcxt->estimator about our needs.
482 : * ----------------------------------------------------------------
483 : */
484 : void
485 GIC 66 : ExecAppendEstimate(AppendState *node,
486 : ParallelContext *pcxt)
487 : {
488 CBC 66 : node->pstate_len =
489 GIC 66 : add_size(offsetof(ParallelAppendState, pa_finished),
490 66 : sizeof(bool) * node->as_nplans);
491 ECB :
492 CBC 66 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
493 66 : shm_toc_estimate_keys(&pcxt->estimator, 1);
494 GIC 66 : }
495 ECB :
496 :
497 : /* ----------------------------------------------------------------
498 : * ExecAppendInitializeDSM
499 : *
500 : * Set up shared state for Parallel Append.
501 : * ----------------------------------------------------------------
502 : */
503 : void
504 GIC 66 : ExecAppendInitializeDSM(AppendState *node,
505 : ParallelContext *pcxt)
506 : {
507 ECB : ParallelAppendState *pstate;
508 :
509 GIC 66 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
510 66 : memset(pstate, 0, node->pstate_len);
511 66 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
512 CBC 66 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
513 ECB :
514 CBC 66 : node->as_pstate = pstate;
515 66 : node->choose_next_subplan = choose_next_subplan_for_leader;
516 GIC 66 : }
517 ECB :
518 : /* ----------------------------------------------------------------
519 : * ExecAppendReInitializeDSM
520 : *
521 : * Reset shared state before beginning a fresh scan.
522 : * ----------------------------------------------------------------
523 : */
524 : void
525 UIC 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
526 : {
527 0 : ParallelAppendState *pstate = node->as_pstate;
528 EUB :
529 UIC 0 : pstate->pa_next_plan = 0;
530 UBC 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
531 UIC 0 : }
532 EUB :
533 : /* ----------------------------------------------------------------
534 : * ExecAppendInitializeWorker
535 : *
536 : * Copy relevant information from TOC into planstate, and initialize
537 : * whatever is required to choose and execute the optimal subplan.
538 : * ----------------------------------------------------------------
539 : */
540 : void
541 GIC 152 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
542 : {
543 152 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
544 CBC 152 : node->choose_next_subplan = choose_next_subplan_for_worker;
545 GIC 152 : }
546 ECB :
547 : /* ----------------------------------------------------------------
548 : * choose_next_subplan_locally
549 : *
550 : * Choose next sync subplan for a non-parallel-aware Append,
551 : * returning false if there are no more.
552 : * ----------------------------------------------------------------
553 : */
554 : static bool
555 GIC 138372 : choose_next_subplan_locally(AppendState *node)
556 : {
557 138372 : int whichplan = node->as_whichplan;
558 ECB : int nextplan;
559 :
560 : /* We should never be called when there are no subplans */
561 GIC 138372 : Assert(node->as_nplans > 0);
562 :
563 : /* Nothing to do if syncdone */
564 CBC 138372 : if (node->as_syncdone)
565 GIC 17 : return false;
566 :
567 ECB : /*
568 : * If first call then have the bms member function choose the first valid
569 : * sync subplan by initializing whichplan to -1. If there happen to be no
570 : * valid sync subplans then the bms member function will handle that by
571 : * returning a negative number which will allow us to exit returning a
572 : * false value.
573 : */
574 GIC 138355 : if (whichplan == INVALID_SUBPLAN_INDEX)
575 : {
576 46096 : if (node->as_nasyncplans > 0)
577 ECB : {
578 : /* We'd have filled as_valid_subplans already */
579 GNC 19 : Assert(node->as_valid_subplans_identified);
580 : }
581 46077 : else if (!node->as_valid_subplans_identified)
582 : {
583 CBC 1691 : node->as_valid_subplans =
584 GIC 1691 : ExecFindMatchingSubPlans(node->as_prune_state, false);
585 GNC 1691 : node->as_valid_subplans_identified = true;
586 : }
587 ECB :
588 GIC 46096 : whichplan = -1;
589 ECB : }
590 :
591 : /* Ensure whichplan is within the expected range */
592 GIC 138355 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
593 :
594 CBC 138355 : if (ScanDirectionIsForward(node->ps.state->es_direction))
595 GIC 138346 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
596 : else
597 9 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
598 ECB :
599 GIC 138355 : if (nextplan < 0)
600 ECB : {
601 : /* Set as_syncdone if in async mode */
602 GIC 45853 : if (node->as_nasyncplans > 0)
603 CBC 17 : node->as_syncdone = true;
604 GIC 45853 : return false;
605 ECB : }
606 :
607 GIC 92502 : node->as_whichplan = nextplan;
608 ECB :
609 CBC 92502 : return true;
610 ECB : }
611 :
612 : /* ----------------------------------------------------------------
613 : * choose_next_subplan_for_leader
614 : *
615 : * Try to pick a plan which doesn't commit us to doing much
616 : * work locally, so that as much work as possible is done in
617 : * the workers. Cheapest subplans are at the end.
618 : * ----------------------------------------------------------------
619 : */
620 : static bool
621 GIC 241 : choose_next_subplan_for_leader(AppendState *node)
622 : {
623 241 : ParallelAppendState *pstate = node->as_pstate;
624 :
625 : /* Backward scan is not supported by parallel-aware plans */
626 241 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
627 ECB :
628 : /* We should never be called when there are no subplans */
629 CBC 241 : Assert(node->as_nplans > 0);
630 :
631 GIC 241 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
632 ECB :
633 GIC 241 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
634 : {
635 ECB : /* Mark just-completed subplan as finished. */
636 GIC 184 : node->as_pstate->pa_finished[node->as_whichplan] = true;
637 ECB : }
638 : else
639 : {
640 : /* Start with last subplan. */
641 GIC 57 : node->as_whichplan = node->as_nplans - 1;
642 ECB :
643 : /*
644 : * If we've yet to determine the valid subplans then do so now. If
645 : * run-time pruning is disabled then the valid subplans will always be
646 : * set to all subplans.
647 : */
648 GNC 57 : if (!node->as_valid_subplans_identified)
649 : {
650 GIC 12 : node->as_valid_subplans =
651 12 : ExecFindMatchingSubPlans(node->as_prune_state, false);
652 GNC 12 : node->as_valid_subplans_identified = true;
653 :
654 : /*
655 ECB : * Mark each invalid plan as finished to allow the loop below to
656 : * select the first valid subplan.
657 : */
658 CBC 12 : mark_invalid_subplans_as_finished(node);
659 ECB : }
660 : }
661 :
662 : /* Loop until we find a subplan to execute. */
663 GIC 388 : while (pstate->pa_finished[node->as_whichplan])
664 : {
665 CBC 204 : if (node->as_whichplan == 0)
666 : {
667 GIC 57 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
668 57 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
669 57 : LWLockRelease(&pstate->pa_lock);
670 CBC 57 : return false;
671 : }
672 ECB :
673 : /*
674 : * We needn't pay attention to as_valid_subplans here as all invalid
675 : * plans have been marked as finished.
676 : */
677 CBC 147 : node->as_whichplan--;
678 : }
679 :
680 : /* If non-partial, immediately mark as finished. */
681 GIC 184 : if (node->as_whichplan < node->as_first_partial_plan)
682 61 : node->as_pstate->pa_finished[node->as_whichplan] = true;
683 :
684 CBC 184 : LWLockRelease(&pstate->pa_lock);
685 :
686 GIC 184 : return true;
687 : }
688 ECB :
689 : /* ----------------------------------------------------------------
690 : * choose_next_subplan_for_worker
691 : *
692 : * Choose next subplan for a parallel-aware Append, returning
693 : * false if there are no more.
694 : *
695 : * We start from the first plan and advance through the list;
696 : * when we get back to the end, we loop back to the first
697 : * partial plan. This assigns the non-partial plans first in
698 : * order of descending cost and then spreads out the workers
699 : * as evenly as possible across the remaining partial plans.
700 : * ----------------------------------------------------------------
701 : */
702 : static bool
703 GIC 217 : choose_next_subplan_for_worker(AppendState *node)
704 : {
705 217 : ParallelAppendState *pstate = node->as_pstate;
706 :
707 : /* Backward scan is not supported by parallel-aware plans */
708 217 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
709 :
710 ECB : /* We should never be called when there are no subplans */
711 GIC 217 : Assert(node->as_nplans > 0);
712 ECB :
713 GIC 217 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
714 :
715 ECB : /* Mark just-completed subplan as finished. */
716 GIC 217 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
717 71 : node->as_pstate->pa_finished[node->as_whichplan] = true;
718 ECB :
719 : /*
720 : * If we've yet to determine the valid subplans then do so now. If
721 : * run-time pruning is disabled then the valid subplans will always be set
722 : * to all subplans.
723 : */
724 GNC 146 : else if (!node->as_valid_subplans_identified)
725 : {
726 GIC 23 : node->as_valid_subplans =
727 23 : ExecFindMatchingSubPlans(node->as_prune_state, false);
728 GNC 23 : node->as_valid_subplans_identified = true;
729 :
730 GIC 23 : mark_invalid_subplans_as_finished(node);
731 : }
732 :
733 ECB : /* If all the plans are already done, we have nothing to do */
734 GIC 217 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
735 ECB : {
736 CBC 121 : LWLockRelease(&pstate->pa_lock);
737 121 : return false;
738 : }
739 ECB :
740 : /* Save the plan from which we are starting the search. */
741 GIC 96 : node->as_whichplan = pstate->pa_next_plan;
742 :
743 ECB : /* Loop until we find a valid subplan to execute. */
744 GIC 203 : while (pstate->pa_finished[pstate->pa_next_plan])
745 ECB : {
746 : int nextplan;
747 :
748 GIC 132 : nextplan = bms_next_member(node->as_valid_subplans,
749 : pstate->pa_next_plan);
750 CBC 132 : if (nextplan >= 0)
751 : {
752 : /* Advance to the next valid plan. */
753 99 : pstate->pa_next_plan = nextplan;
754 : }
755 GIC 33 : else if (node->as_whichplan > node->as_first_partial_plan)
756 : {
757 ECB : /*
758 : * Try looping back to the first valid partial plan, if there is
759 : * one. If there isn't, arrange to bail out below.
760 : */
761 GIC 18 : nextplan = bms_next_member(node->as_valid_subplans,
762 CBC 18 : node->as_first_partial_plan - 1);
763 GIC 18 : pstate->pa_next_plan =
764 CBC 18 : nextplan < 0 ? node->as_whichplan : nextplan;
765 : }
766 : else
767 : {
768 : /*
769 : * At last plan, and either there are no partial plans or we've
770 ECB : * tried them all. Arrange to bail out.
771 : */
772 CBC 15 : pstate->pa_next_plan = node->as_whichplan;
773 ECB : }
774 :
775 GIC 132 : if (pstate->pa_next_plan == node->as_whichplan)
776 : {
777 : /* We've tried everything! */
778 25 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
779 25 : LWLockRelease(&pstate->pa_lock);
780 25 : return false;
781 ECB : }
782 : }
783 :
784 : /* Pick the plan we found, and advance pa_next_plan one more time. */
785 GIC 71 : node->as_whichplan = pstate->pa_next_plan;
786 71 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
787 ECB : pstate->pa_next_plan);
788 :
789 : /*
790 : * If there are no more valid plans then try setting the next plan to the
791 : * first valid partial plan.
792 : */
793 GIC 71 : if (pstate->pa_next_plan < 0)
794 ECB : {
795 CBC 17 : int nextplan = bms_next_member(node->as_valid_subplans,
796 GIC 17 : node->as_first_partial_plan - 1);
797 :
798 17 : if (nextplan >= 0)
799 17 : pstate->pa_next_plan = nextplan;
800 : else
801 : {
802 ECB : /*
803 : * There are no valid partial plans, and we already chose the last
804 : * non-partial plan; so flag that there's nothing more for our
805 : * fellow workers to do.
806 : */
807 LBC 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
808 ECB : }
809 : }
810 :
811 : /* If non-partial, immediately mark as finished. */
812 GIC 71 : if (node->as_whichplan < node->as_first_partial_plan)
813 8 : node->as_pstate->pa_finished[node->as_whichplan] = true;
814 :
815 71 : LWLockRelease(&pstate->pa_lock);
816 EUB :
817 GIC 71 : return true;
818 : }
819 :
820 : /*
821 ECB : * mark_invalid_subplans_as_finished
822 : * Marks the ParallelAppendState's pa_finished as true for each invalid
823 : * subplan.
824 : *
825 : * This function should only be called for parallel Append with run-time
826 : * pruning enabled.
827 : */
828 : static void
829 GIC 35 : mark_invalid_subplans_as_finished(AppendState *node)
830 : {
831 : int i;
832 :
833 : /* Only valid to call this while in parallel Append mode */
834 35 : Assert(node->as_pstate);
835 :
836 : /* Shouldn't have been called when run-time pruning is not enabled */
837 35 : Assert(node->as_prune_state);
838 ECB :
839 : /* Nothing to do if all plans are valid */
840 GIC 35 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
841 UIC 0 : return;
842 :
843 ECB : /* Mark all non-valid plans as finished */
844 GIC 113 : for (i = 0; i < node->as_nplans; i++)
845 : {
846 CBC 78 : if (!bms_is_member(i, node->as_valid_subplans))
847 GIC 35 : node->as_pstate->pa_finished[i] = true;
848 : }
849 ECB : }
850 EUB :
851 : /* ----------------------------------------------------------------
852 : * Asynchronous Append Support
853 ECB : * ----------------------------------------------------------------
854 : */
855 :
856 : /* ----------------------------------------------------------------
857 : * ExecAppendAsyncBegin
858 : *
859 : * Begin executing designed async-capable subplans.
860 : * ----------------------------------------------------------------
861 : */
862 : static void
863 GIC 36 : ExecAppendAsyncBegin(AppendState *node)
864 : {
865 : int i;
866 :
867 : /* Backward scan is not supported by async-aware Appends. */
868 36 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
869 :
870 : /* We should never be called when there are no subplans */
871 36 : Assert(node->as_nplans > 0);
872 ECB :
873 : /* We should never be called when there are no async subplans. */
874 GIC 36 : Assert(node->as_nasyncplans > 0);
875 :
876 : /* If we've yet to determine the valid subplans then do so now. */
877 GNC 36 : if (!node->as_valid_subplans_identified)
878 : {
879 GIC 2 : node->as_valid_subplans =
880 CBC 2 : ExecFindMatchingSubPlans(node->as_prune_state, false);
881 GNC 2 : node->as_valid_subplans_identified = true;
882 :
883 GIC 2 : classify_matching_subplans(node);
884 ECB : }
885 :
886 : /* Initialize state variables. */
887 CBC 36 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
888 GIC 36 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
889 ECB :
890 : /* Nothing to do if there are no valid async subplans. */
891 CBC 36 : if (node->as_nasyncremain == 0)
892 UIC 0 : return;
893 ECB :
894 : /* Make a request for each of the valid async subplans. */
895 GIC 36 : i = -1;
896 105 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
897 ECB : {
898 CBC 69 : AsyncRequest *areq = node->as_asyncrequests[i];
899 :
900 GIC 69 : Assert(areq->request_index == i);
901 CBC 69 : Assert(!areq->callback_pending);
902 EUB :
903 : /* Do the actual work. */
904 GIC 69 : ExecAsyncRequest(areq);
905 ECB : }
906 : }
907 :
908 : /* ----------------------------------------------------------------
909 : * ExecAppendAsyncGetNext
910 : *
911 : * Get the next tuple from any of the asynchronous subplans.
912 : * ----------------------------------------------------------------
913 : */
914 : static bool
915 GIC 5736 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
916 : {
917 5736 : *result = NULL;
918 :
919 : /* We should never be called when there are no valid async subplans. */
920 5736 : Assert(node->as_nasyncremain > 0);
921 :
922 : /* Request a tuple asynchronously. */
923 5736 : if (ExecAppendAsyncRequest(node, result))
924 5667 : return true;
925 ECB :
926 GIC 102 : while (node->as_nasyncremain > 0)
927 ECB : {
928 GIC 72 : CHECK_FOR_INTERRUPTS();
929 :
930 ECB : /* Wait or poll for async events. */
931 GIC 72 : ExecAppendAsyncEventWait(node);
932 :
933 ECB : /* Request a tuple asynchronously. */
934 CBC 72 : if (ExecAppendAsyncRequest(node, result))
935 GIC 39 : return true;
936 ECB :
937 : /* Break from loop if there's any sync subplan that isn't complete. */
938 CBC 33 : if (!node->as_syncdone)
939 UIC 0 : break;
940 : }
941 ECB :
942 : /*
943 : * If all sync subplans are complete, we're totally done scanning the
944 : * given node. Otherwise, we're done with the asynchronous stuff but must
945 : * continue scanning the sync subplans.
946 : */
947 GIC 30 : if (node->as_syncdone)
948 ECB : {
949 GBC 30 : Assert(node->as_nasyncremain == 0);
950 GIC 30 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
951 30 : return true;
952 : }
953 :
954 UIC 0 : return false;
955 : }
956 :
957 ECB : /* ----------------------------------------------------------------
958 : * ExecAppendAsyncRequest
959 : *
960 : * Request a tuple asynchronously.
961 : * ----------------------------------------------------------------
962 : */
963 : static bool
964 GBC 5808 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
965 : {
966 : Bitmapset *needrequest;
967 : int i;
968 :
969 : /* Nothing to do if there are no async subplans needing a new request. */
970 GIC 5808 : if (bms_is_empty(node->as_needrequest))
971 : {
972 49 : Assert(node->as_nasyncresults == 0);
973 49 : return false;
974 ECB : }
975 :
976 : /*
977 : * If there are any asynchronously-generated results that have not yet
978 : * been returned, we have nothing to do; just return one of them.
979 : */
980 CBC 5759 : if (node->as_nasyncresults > 0)
981 : {
982 2493 : --node->as_nasyncresults;
983 2493 : *result = node->as_asyncresults[node->as_nasyncresults];
984 GIC 2493 : return true;
985 : }
986 :
987 : /* Make a new request for each of the async subplans that need it. */
988 3266 : needrequest = node->as_needrequest;
989 3266 : node->as_needrequest = NULL;
990 CBC 3266 : i = -1;
991 GIC 8969 : while ((i = bms_next_member(needrequest, i)) >= 0)
992 ECB : {
993 CBC 5703 : AsyncRequest *areq = node->as_asyncrequests[i];
994 ECB :
995 : /* Do the actual work. */
996 GIC 5703 : ExecAsyncRequest(areq);
997 : }
998 CBC 3266 : bms_free(needrequest);
999 ECB :
1000 : /* Return one of the asynchronously-generated results if any. */
1001 CBC 3266 : if (node->as_nasyncresults > 0)
1002 : {
1003 3213 : --node->as_nasyncresults;
1004 GIC 3213 : *result = node->as_asyncresults[node->as_nasyncresults];
1005 3213 : return true;
1006 ECB : }
1007 :
1008 CBC 53 : return false;
1009 : }
1010 :
1011 ECB : /* ----------------------------------------------------------------
1012 : * ExecAppendAsyncEventWait
1013 : *
1014 : * Wait or poll for file descriptor events and fire callbacks.
1015 : * ----------------------------------------------------------------
1016 : */
1017 : static void
1018 CBC 89 : ExecAppendAsyncEventWait(AppendState *node)
1019 : {
1020 GIC 89 : int nevents = node->as_nasyncplans + 1;
1021 89 : long timeout = node->as_syncdone ? -1 : 0;
1022 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1023 : int noccurred;
1024 : int i;
1025 :
1026 : /* We should never be called when there are no valid async subplans. */
1027 89 : Assert(node->as_nasyncremain > 0);
1028 ECB :
1029 GIC 89 : node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
1030 CBC 89 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1031 ECB : NULL, NULL);
1032 :
1033 : /* Give each waiting subplan a chance to add an event. */
1034 GIC 89 : i = -1;
1035 271 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1036 : {
1037 CBC 182 : AsyncRequest *areq = node->as_asyncrequests[i];
1038 :
1039 182 : if (areq->callback_pending)
1040 163 : ExecAsyncConfigureWait(areq);
1041 : }
1042 :
1043 : /*
1044 ECB : * No need for further processing if there are no configured events other
1045 : * than the postmaster death event.
1046 : */
1047 CBC 89 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1048 : {
1049 1 : FreeWaitEventSet(node->as_eventset);
1050 1 : node->as_eventset = NULL;
1051 GIC 1 : return;
1052 : }
1053 :
1054 : /* We wait on at most EVENT_BUFFER_SIZE events. */
1055 88 : if (nevents > EVENT_BUFFER_SIZE)
1056 UIC 0 : nevents = EVENT_BUFFER_SIZE;
1057 ECB :
1058 : /*
1059 : * If the timeout is -1, wait until at least one event occurs. If the
1060 : * timeout is 0, poll for events, but do not wait at all.
1061 : */
1062 GIC 88 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1063 : nevents, WAIT_EVENT_APPEND_READY);
1064 88 : FreeWaitEventSet(node->as_eventset);
1065 CBC 88 : node->as_eventset = NULL;
1066 GBC 88 : if (noccurred == 0)
1067 UIC 0 : return;
1068 :
1069 : /* Deliver notifications. */
1070 GIC 233 : for (i = 0; i < noccurred; i++)
1071 : {
1072 CBC 145 : WaitEvent *w = &occurred_event[i];
1073 :
1074 ECB : /*
1075 : * Each waiting subplan should have registered its wait event with
1076 : * user_data pointing back to its AsyncRequest.
1077 EUB : */
1078 GIC 145 : if ((w->events & WL_SOCKET_READABLE) != 0)
1079 : {
1080 CBC 145 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1081 :
1082 145 : if (areq->callback_pending)
1083 : {
1084 : /*
1085 : * Mark it as no longer needing a callback. We must do this
1086 : * before dispatching the callback in case the callback resets
1087 : * the flag.
1088 ECB : */
1089 GIC 145 : areq->callback_pending = false;
1090 ECB :
1091 : /* Do the actual work. */
1092 CBC 145 : ExecAsyncNotify(areq);
1093 : }
1094 : }
1095 : }
1096 : }
1097 :
1098 : /* ----------------------------------------------------------------
1099 ECB : * ExecAsyncAppendResponse
1100 : *
1101 : * Receive a response from an asynchronous request we made.
1102 : * ----------------------------------------------------------------
1103 : */
1104 : void
1105 GIC 5920 : ExecAsyncAppendResponse(AsyncRequest *areq)
1106 : {
1107 5920 : AppendState *node = (AppendState *) areq->requestor;
1108 5920 : TupleTableSlot *slot = areq->result;
1109 :
1110 : /* The result should be a TupleTableSlot or NULL. */
1111 5920 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1112 :
1113 : /* Nothing to do if the request is pending. */
1114 5920 : if (!areq->request_complete)
1115 ECB : {
1116 : /* The request would have been pending for a callback. */
1117 CBC 154 : Assert(areq->callback_pending);
1118 154 : return;
1119 : }
1120 :
1121 ECB : /* If the result is NULL or an empty slot, there's nothing more to do. */
1122 GIC 5766 : if (TupIsNull(slot))
1123 : {
1124 ECB : /* The ending subplan wouldn't have been pending for a callback. */
1125 GIC 59 : Assert(!areq->callback_pending);
1126 59 : --node->as_nasyncremain;
1127 CBC 59 : return;
1128 ECB : }
1129 :
1130 : /* Save result so we can return it. */
1131 GIC 5707 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1132 CBC 5707 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1133 :
1134 : /*
1135 ECB : * Mark the subplan that returned a result as ready for a new request. We
1136 : * don't launch another one here immediately because it might complete.
1137 : */
1138 GIC 5707 : node->as_needrequest = bms_add_member(node->as_needrequest,
1139 : areq->request_index);
1140 : }
1141 ECB :
1142 : /* ----------------------------------------------------------------
1143 : * classify_matching_subplans
1144 : *
1145 : * Classify the node's as_valid_subplans into sync ones and
1146 : * async ones, adjust it to contain sync ones only, and save
1147 : * async ones in the node's as_valid_asyncplans.
1148 : * ----------------------------------------------------------------
1149 : */
1150 : static void
1151 GIC 45 : classify_matching_subplans(AppendState *node)
1152 : {
1153 : Bitmapset *valid_asyncplans;
1154 :
1155 GNC 45 : Assert(node->as_valid_subplans_identified);
1156 GIC 45 : Assert(node->as_valid_asyncplans == NULL);
1157 :
1158 : /* Nothing to do if there are no valid subplans. */
1159 45 : if (bms_is_empty(node->as_valid_subplans))
1160 : {
1161 UIC 0 : node->as_syncdone = true;
1162 LBC 0 : node->as_nasyncremain = 0;
1163 UIC 0 : return;
1164 : }
1165 :
1166 ECB : /* Nothing to do if there are no valid async subplans. */
1167 CBC 45 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1168 : {
1169 UIC 0 : node->as_nasyncremain = 0;
1170 LBC 0 : return;
1171 : }
1172 EUB :
1173 : /* Get valid async subplans. */
1174 GNC 45 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1175 45 : node->as_valid_subplans);
1176 :
1177 ECB : /* Adjust the valid subplans to contain sync subplans only. */
1178 GIC 45 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1179 EUB : valid_asyncplans);
1180 :
1181 : /* Save valid async subplans. */
1182 GIC 45 : node->as_valid_asyncplans = valid_asyncplans;
1183 : }
|