Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitAppend - initialize the append node
17 : * ExecAppend - retrieve the next tuple from the node
18 : * ExecEndAppend - shut down the append node
19 : * ExecReScanAppend - rescan the append node
20 : *
21 : * NOTES
22 : * Each append node contains a list of one or more subplans which
23 : * must be iteratively processed (forwards or backwards).
24 : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : * until the subplan stops returning tuples, at which point that
26 : * plan is shut down and the next started up.
27 : *
28 : * Append nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical append node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * Append -------+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : *
39 : * Append nodes are currently used for unions, and to support
40 : * inheritance queries, where several relations need to be scanned.
41 : * For example, in our standard person/student/employee/student-emp
42 : * example, where student and employee inherit from person
43 : * and student-emp inherits from student and employee, the
44 : * query:
45 : *
46 : * select name from person
47 : *
48 : * generates the plan:
49 : *
50 : * |
51 : * Append -------+-------+--------+--------+
52 : * / \ | | | |
53 : * nil nil Scan Scan Scan Scan
54 : * | | | |
55 : * person employee student student-emp
56 : */
57 :
58 : #include "postgres.h"
59 :
60 : #include "executor/execAsync.h"
61 : #include "executor/execdebug.h"
62 : #include "executor/execPartition.h"
63 : #include "executor/nodeAppend.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "storage/latch.h"
67 :
68 : /* Shared state for parallel-aware Append. */
69 : struct ParallelAppendState
70 : {
71 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : int pa_next_plan; /* next plan to choose by any worker */
73 :
74 : /*
75 : * pa_finished[i] should be true if no more workers should select subplan
76 : * i. for a non-partial plan, this should be set to true as soon as a
77 : * worker selects the plan; for a partial plan, it remains false until
78 : * some worker executes the plan to completion.
79 : */
80 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : };
82 :
83 : #define INVALID_SUBPLAN_INDEX -1
84 : #define EVENT_BUFFER_SIZE 16
85 :
86 : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : static bool choose_next_subplan_locally(AppendState *node);
88 : static bool choose_next_subplan_for_leader(AppendState *node);
89 : static bool choose_next_subplan_for_worker(AppendState *node);
90 : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : static void ExecAppendAsyncBegin(AppendState *node);
92 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : static void ExecAppendAsyncEventWait(AppendState *node);
95 : static void classify_matching_subplans(AppendState *node);
96 :
97 : /* ----------------------------------------------------------------
98 : * ExecInitAppend
99 : *
100 : * Begin all of the subscans of the append node.
101 : *
102 : * (This is potentially wasteful, since the entire result of the
103 : * append node may not be scanned, but this way all of the
104 : * structures get allocated in the executor's top level memory
105 : * block instead of that of the call to ExecAppend.)
106 : * ----------------------------------------------------------------
107 : */
108 : AppendState *
6249 tgl 109 CBC 6518 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : {
7430 111 6518 : AppendState *appendstate = makeNode(AppendState);
112 : PlanState **appendplanstates;
113 : Bitmapset *validsubplans;
114 : Bitmapset *asyncplans;
115 : int nplans;
116 : int nasyncplans;
117 : int firstvalid;
118 : int i,
119 : j;
120 :
121 : /* check for unsupported flags */
6249 122 6518 : Assert(!(eflags & EXEC_FLAG_MARK));
123 :
124 : /*
125 : * create new AppendState for our append node
126 : */
7430 127 6518 : appendstate->ps.plan = (Plan *) node;
128 6518 : appendstate->ps.state = estate;
2092 andres 129 6518 : appendstate->ps.ExecProcNode = ExecAppend;
130 :
131 : /* Let choose_next_subplan_* function handle setting the first subplan */
1828 alvherre 132 6518 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
739 efujita 133 6518 : appendstate->as_syncdone = false;
134 6518 : appendstate->as_begun = false;
135 :
136 : /* If run-time partition pruning is enabled, then set that up now */
129 alvherre 137 GNC 6518 : if (node->part_prune_index >= 0)
138 : {
139 : PartitionPruneState *prunestate;
140 :
141 : /*
142 : * Set up pruning data structure. This also initializes the set of
143 : * subplans to initialize (validsubplans) by taking into account the
144 : * result of performing initial pruning if any.
145 : */
369 alvherre 146 CBC 275 : prunestate = ExecInitPartitionPruning(&appendstate->ps,
147 275 : list_length(node->appendplans),
148 : node->part_prune_index,
149 : node->apprelids,
150 : &validsubplans);
1761 tgl 151 GIC 275 : appendstate->as_prune_state = prunestate;
369 alvherre 152 CBC 275 : nplans = bms_num_members(validsubplans);
1828 alvherre 153 ECB :
154 : /*
155 : * When no run-time pruning is required and there's at least one
156 : * subplan, we can fill as_valid_subplans immediately, preventing
157 : * later calls to ExecFindMatchingSubPlans.
158 : */
1215 tgl 159 GIC 275 : if (!prunestate->do_exec_prune && nplans > 0)
160 : {
1828 alvherre 161 CBC 70 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
38 tgl 162 GNC 70 : appendstate->as_valid_subplans_identified = true;
163 : }
164 : }
1828 alvherre 165 ECB : else
166 : {
1828 alvherre 167 GIC 6243 : nplans = list_length(node->appendplans);
168 :
169 : /*
170 : * When run-time partition pruning is not enabled we can just mark all
1764 tgl 171 ECB : * subplans as valid; they must also all be initialized.
172 : */
1714 alvherre 173 GIC 6243 : Assert(nplans > 0);
1828 174 6243 : appendstate->as_valid_subplans = validsubplans =
175 6243 : bms_add_range(NULL, 0, nplans - 1);
38 tgl 176 GNC 6243 : appendstate->as_valid_subplans_identified = true;
1828 alvherre 177 GIC 6243 : appendstate->as_prune_state = NULL;
1828 alvherre 178 ECB : }
9345 bruce 179 :
8053 180 : /*
1878 andres 181 : * Initialize result tuple type and slot.
9345 bruce 182 : */
1606 andres 183 GIC 6518 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
184 :
185 : /* node returns slots from each of its subnodes, therefore not fixed */
186 6518 : appendstate->ps.resultopsset = true;
187 6518 : appendstate->ps.resultopsfixed = false;
9345 bruce 188 ECB :
1828 alvherre 189 GIC 6518 : appendplanstates = (PlanState **) palloc(nplans *
190 : sizeof(PlanState *));
1828 alvherre 191 ECB :
8053 bruce 192 : /*
193 : * call ExecInitNode on each of the valid plans to be executed and save
1828 alvherre 194 : * the results into the appendplanstates array.
195 : *
196 : * While at it, find out the first valid partial plan.
197 : */
1357 drowley 198 GIC 6518 : j = 0;
739 efujita 199 6518 : asyncplans = NULL;
200 6518 : nasyncplans = 0;
1818 alvherre 201 6518 : firstvalid = nplans;
1357 drowley 202 6518 : i = -1;
1357 drowley 203 CBC 25418 : while ((i = bms_next_member(validsubplans, i)) >= 0)
9345 bruce 204 ECB : {
1357 drowley 205 CBC 18900 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
9345 bruce 206 ECB :
739 efujita 207 : /*
208 : * Record async subplans. When executing EvalPlanQual, we treat them
209 : * as sync ones; don't do this when initializing an EvalPlanQual plan
210 : * tree.
211 : */
739 efujita 212 GIC 18900 : if (initNode->async_capable && estate->es_epq_active == NULL)
213 : {
214 90 : asyncplans = bms_add_member(asyncplans, j);
215 90 : nasyncplans++;
216 : }
739 efujita 217 ECB :
218 : /*
1357 drowley 219 : * Record the lowest appendplans index which is a valid partial plan.
220 : */
1357 drowley 221 GIC 18900 : if (i >= node->first_partial_plan && j < firstvalid)
222 221 : firstvalid = j;
223 :
224 18900 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
225 : }
9345 bruce 226 ECB :
1818 alvherre 227 CBC 6518 : appendstate->as_first_partial_plan = firstvalid;
1828 alvherre 228 GIC 6518 : appendstate->appendplans = appendplanstates;
1828 alvherre 229 CBC 6518 : appendstate->as_nplans = nplans;
230 :
231 : /* Initialize async state */
739 efujita 232 6518 : appendstate->as_asyncplans = asyncplans;
233 6518 : appendstate->as_nasyncplans = nasyncplans;
234 6518 : appendstate->as_asyncrequests = NULL;
671 efujita 235 GIC 6518 : appendstate->as_asyncresults = NULL;
236 6518 : appendstate->as_nasyncresults = 0;
671 efujita 237 CBC 6518 : appendstate->as_nasyncremain = 0;
739 238 6518 : appendstate->as_needrequest = NULL;
239 6518 : appendstate->as_eventset = NULL;
671 240 6518 : appendstate->as_valid_asyncplans = NULL;
739 efujita 241 ECB :
739 efujita 242 CBC 6518 : if (nasyncplans > 0)
739 efujita 243 ECB : {
739 efujita 244 CBC 46 : appendstate->as_asyncrequests = (AsyncRequest **)
245 46 : palloc0(nplans * sizeof(AsyncRequest *));
246 :
247 46 : i = -1;
739 efujita 248 GIC 136 : while ((i = bms_next_member(asyncplans, i)) >= 0)
739 efujita 249 ECB : {
250 : AsyncRequest *areq;
251 :
739 efujita 252 CBC 90 : areq = palloc(sizeof(AsyncRequest));
253 90 : areq->requestor = (PlanState *) appendstate;
739 efujita 254 GIC 90 : areq->requestee = appendplanstates[i];
255 90 : areq->request_index = i;
256 90 : areq->callback_pending = false;
739 efujita 257 CBC 90 : areq->request_complete = false;
258 90 : areq->result = NULL;
739 efujita 259 ECB :
739 efujita 260 CBC 90 : appendstate->as_asyncrequests[i] = areq;
739 efujita 261 ECB : }
671 262 :
671 efujita 263 CBC 46 : appendstate->as_asyncresults = (TupleTableSlot **)
671 efujita 264 GIC 46 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
671 efujita 265 ECB :
38 tgl 266 GNC 46 : if (appendstate->as_valid_subplans_identified)
671 efujita 267 GIC 43 : classify_matching_subplans(appendstate);
739 efujita 268 ECB : }
269 :
270 : /*
1878 andres 271 : * Miscellaneous initialization
9345 bruce 272 : */
273 :
1828 alvherre 274 GIC 6518 : appendstate->ps.ps_ProjInfo = NULL;
275 :
276 : /* For parallel query, this will be overridden later. */
1951 rhaas 277 6518 : appendstate->choose_next_subplan = choose_next_subplan_locally;
278 :
7430 tgl 279 CBC 6518 : return appendstate;
280 : }
281 :
9770 scrappy 282 ECB : /* ----------------------------------------------------------------
283 : * ExecAppend
9345 bruce 284 : *
285 : * Handles iteration over multiple subplans.
286 : * ----------------------------------------------------------------
287 : */
288 : static TupleTableSlot *
2092 andres 289 GIC 1774005 : ExecAppend(PlanState *pstate)
290 : {
291 1774005 : AppendState *node = castNode(AppendState, pstate);
292 : TupleTableSlot *result;
293 :
739 efujita 294 ECB : /*
295 : * If this is the first call after Init or ReScan, we need to do the
296 : * initialization work.
297 : */
739 efujita 298 GIC 1774005 : if (!node->as_begun)
299 : {
300 46334 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
301 46334 : Assert(!node->as_syncdone);
302 :
1215 tgl 303 ECB : /* Nothing to do if there are no subplans */
1215 tgl 304 GIC 46334 : if (node->as_nplans == 0)
1215 tgl 305 CBC 18 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
1215 tgl 306 ECB :
307 : /* If there are any async subplans, begin executing them. */
739 efujita 308 GIC 46316 : if (node->as_nasyncplans > 0)
739 efujita 309 CBC 36 : ExecAppendAsyncBegin(node);
739 efujita 310 ECB :
311 : /*
312 : * If no sync subplan has been chosen, we must choose one before
1828 alvherre 313 : * proceeding.
314 : */
739 efujita 315 GIC 46316 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
1828 alvherre 316 1610 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
317 :
739 efujita 318 44706 : Assert(node->as_syncdone ||
319 : (node->as_whichplan >= 0 &&
739 efujita 320 ECB : node->as_whichplan < node->as_nplans));
321 :
322 : /* And we're initialized. */
739 efujita 323 CBC 44706 : node->as_begun = true;
324 : }
325 :
326 : for (;;)
9345 bruce 327 GIC 48084 : {
6531 tgl 328 ECB : PlanState *subnode;
329 :
2084 andres 330 GIC 1820461 : CHECK_FOR_INTERRUPTS();
331 :
8053 bruce 332 ECB : /*
333 : * try to get a tuple from an async subplan if any
334 : */
739 efujita 335 CBC 1820461 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
336 : {
739 efujita 337 GIC 5736 : if (ExecAppendAsyncGetNext(node, &result))
338 5736 : return result;
739 efujita 339 UIC 0 : Assert(!node->as_syncdone);
739 efujita 340 LBC 0 : Assert(bms_is_empty(node->as_needrequest));
341 : }
739 efujita 342 ECB :
343 : /*
739 efujita 344 EUB : * figure out which sync subplan we are currently processing
9345 bruce 345 : */
1951 rhaas 346 GIC 1814725 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
6531 tgl 347 1814725 : subnode = node->appendplans[node->as_whichplan];
348 :
349 : /*
350 : * get a tuple from the subplan
9345 bruce 351 ECB : */
6531 tgl 352 CBC 1814725 : result = ExecProcNode(subnode);
353 :
6531 tgl 354 GIC 1814699 : if (!TupIsNull(result))
355 : {
356 : /*
6385 bruce 357 ECB : * If the subplan gave us something then return it as-is. We do
358 : * NOT make use of the result slot that was set up in
4929 tgl 359 : * ExecInitAppend; there's no need for it.
360 : */
6531 tgl 361 GIC 1722185 : return result;
362 : }
363 :
364 : /*
365 : * wait or poll for async events if any. We do this before checking
697 efujita 366 ECB : * for the end of iteration, because it might drain the remaining
367 : * async subplans.
368 : */
739 efujita 369 GIC 92514 : if (node->as_nasyncremain > 0)
370 17 : ExecAppendAsyncEventWait(node);
371 :
372 : /* choose new sync subplan; if no sync/async subplans, we're done */
373 92514 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
6531 tgl 374 CBC 44430 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
9345 bruce 375 ECB : }
376 : }
377 :
9770 scrappy 378 : /* ----------------------------------------------------------------
9345 bruce 379 : * ExecEndAppend
380 : *
381 : * Shuts down the subscans of the append node.
382 : *
383 : * Returns nothing of interest.
384 : * ----------------------------------------------------------------
385 : */
386 : void
7430 tgl 387 GIC 6379 : ExecEndAppend(AppendState *node)
388 : {
389 : PlanState **appendplans;
390 : int nplans;
391 : int i;
9345 bruce 392 ECB :
393 : /*
394 : * get information from the node
395 : */
9034 bruce 396 GIC 6379 : appendplans = node->appendplans;
7430 tgl 397 6379 : nplans = node->as_nplans;
398 :
399 : /*
400 : * shut down each of the subscans
9345 bruce 401 ECB : */
9345 bruce 402 CBC 24956 : for (i = 0; i < nplans; i++)
4929 tgl 403 GIC 18577 : ExecEndNode(appendplans[i]);
9345 bruce 404 6379 : }
405 :
406 : void
4654 tgl 407 CBC 41962 : ExecReScanAppend(AppendState *node)
9034 bruce 408 ECB : {
739 efujita 409 CBC 41962 : int nasyncplans = node->as_nasyncplans;
410 : int i;
411 :
1828 alvherre 412 ECB : /*
413 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
1764 tgl 414 : * we'd better unset the valid subplans so that they are reselected for
415 : * the new parameter values.
416 : */
1828 alvherre 417 GIC 43596 : if (node->as_prune_state &&
418 1634 : bms_overlap(node->ps.chgParam,
1764 tgl 419 1634 : node->as_prune_state->execparamids))
420 : {
38 tgl 421 GNC 1634 : node->as_valid_subplans_identified = false;
1828 alvherre 422 GIC 1634 : bms_free(node->as_valid_subplans);
1828 alvherre 423 CBC 1634 : node->as_valid_subplans = NULL;
38 tgl 424 GNC 1634 : bms_free(node->as_valid_asyncplans);
425 1634 : node->as_valid_asyncplans = NULL;
1828 alvherre 426 ECB : }
427 :
4929 tgl 428 CBC 137173 : for (i = 0; i < node->as_nplans; i++)
429 : {
7188 bruce 430 GIC 95211 : PlanState *subnode = node->appendplans[i];
7836 bruce 431 ECB :
432 : /*
8006 tgl 433 : * ExecReScan doesn't know about my subplans, so I have to do
434 : * changed-parameter signaling myself.
435 : */
7364 tgl 436 GIC 95211 : if (node->ps.chgParam != NULL)
437 92940 : UpdateChangedParamSet(subnode, node->ps.chgParam);
438 :
8006 tgl 439 ECB : /*
6272 440 : * If chgParam of subnode is not null then plan will be re-scanned by
441 : * first ExecProcNode or by first ExecAsyncRequest.
442 : */
4654 tgl 443 GIC 95211 : if (subnode->chgParam == NULL)
444 40479 : ExecReScan(subnode);
445 : }
1951 rhaas 446 ECB :
739 efujita 447 : /* Reset async state */
739 efujita 448 GIC 41962 : if (nasyncplans > 0)
449 : {
450 17 : i = -1;
739 efujita 451 CBC 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
452 : {
453 34 : AsyncRequest *areq = node->as_asyncrequests[i];
739 efujita 454 ECB :
739 efujita 455 GIC 34 : areq->callback_pending = false;
739 efujita 456 CBC 34 : areq->request_complete = false;
739 efujita 457 GIC 34 : areq->result = NULL;
739 efujita 458 ECB : }
459 :
671 efujita 460 CBC 17 : node->as_nasyncresults = 0;
671 efujita 461 GIC 17 : node->as_nasyncremain = 0;
739 462 17 : bms_free(node->as_needrequest);
739 efujita 463 CBC 17 : node->as_needrequest = NULL;
739 efujita 464 ECB : }
465 :
1828 alvherre 466 : /* Let choose_next_subplan_* function handle setting the first subplan */
1828 alvherre 467 GIC 41962 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
739 efujita 468 41962 : node->as_syncdone = false;
469 41962 : node->as_begun = false;
1951 rhaas 470 CBC 41962 : }
1951 rhaas 471 ECB :
472 : /* ----------------------------------------------------------------
473 : * Parallel Append Support
474 : * ----------------------------------------------------------------
475 : */
476 :
477 : /* ----------------------------------------------------------------
478 : * ExecAppendEstimate
479 : *
480 : * Compute the amount of space we'll need in the parallel
481 : * query DSM, and inform pcxt->estimator about our needs.
482 : * ----------------------------------------------------------------
483 : */
484 : void
1951 rhaas 485 GIC 66 : ExecAppendEstimate(AppendState *node,
486 : ParallelContext *pcxt)
487 : {
1951 rhaas 488 CBC 66 : node->pstate_len =
1951 rhaas 489 GIC 66 : add_size(offsetof(ParallelAppendState, pa_finished),
490 66 : sizeof(bool) * node->as_nplans);
1951 rhaas 491 ECB :
1951 rhaas 492 CBC 66 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
493 66 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1951 rhaas 494 GIC 66 : }
1951 rhaas 495 ECB :
496 :
497 : /* ----------------------------------------------------------------
498 : * ExecAppendInitializeDSM
499 : *
500 : * Set up shared state for Parallel Append.
501 : * ----------------------------------------------------------------
502 : */
503 : void
1951 rhaas 504 GIC 66 : ExecAppendInitializeDSM(AppendState *node,
505 : ParallelContext *pcxt)
506 : {
1951 rhaas 507 ECB : ParallelAppendState *pstate;
508 :
1951 rhaas 509 GIC 66 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
510 66 : memset(pstate, 0, node->pstate_len);
511 66 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
1951 rhaas 512 CBC 66 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
1951 rhaas 513 ECB :
1951 rhaas 514 CBC 66 : node->as_pstate = pstate;
515 66 : node->choose_next_subplan = choose_next_subplan_for_leader;
1951 rhaas 516 GIC 66 : }
1951 rhaas 517 ECB :
518 : /* ----------------------------------------------------------------
519 : * ExecAppendReInitializeDSM
520 : *
521 : * Reset shared state before beginning a fresh scan.
522 : * ----------------------------------------------------------------
523 : */
524 : void
1951 rhaas 525 UIC 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
526 : {
527 0 : ParallelAppendState *pstate = node->as_pstate;
1951 rhaas 528 EUB :
1951 rhaas 529 UIC 0 : pstate->pa_next_plan = 0;
1951 rhaas 530 UBC 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
1951 rhaas 531 UIC 0 : }
1951 rhaas 532 EUB :
533 : /* ----------------------------------------------------------------
534 : * ExecAppendInitializeWorker
535 : *
536 : * Copy relevant information from TOC into planstate, and initialize
537 : * whatever is required to choose and execute the optimal subplan.
538 : * ----------------------------------------------------------------
539 : */
540 : void
1951 rhaas 541 GIC 152 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
542 : {
543 152 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
1951 rhaas 544 CBC 152 : node->choose_next_subplan = choose_next_subplan_for_worker;
1951 rhaas 545 GIC 152 : }
1951 rhaas 546 ECB :
547 : /* ----------------------------------------------------------------
548 : * choose_next_subplan_locally
549 : *
550 : * Choose next sync subplan for a non-parallel-aware Append,
551 : * returning false if there are no more.
552 : * ----------------------------------------------------------------
553 : */
554 : static bool
1951 rhaas 555 GIC 138372 : choose_next_subplan_locally(AppendState *node)
556 : {
557 138372 : int whichplan = node->as_whichplan;
1828 alvherre 558 ECB : int nextplan;
559 :
560 : /* We should never be called when there are no subplans */
1215 tgl 561 GIC 138372 : Assert(node->as_nplans > 0);
562 :
563 : /* Nothing to do if syncdone */
739 efujita 564 CBC 138372 : if (node->as_syncdone)
739 efujita 565 GIC 17 : return false;
566 :
1828 alvherre 567 ECB : /*
568 : * If first call then have the bms member function choose the first valid
569 : * sync subplan by initializing whichplan to -1. If there happen to be no
570 : * valid sync subplans then the bms member function will handle that by
571 : * returning a negative number which will allow us to exit returning a
572 : * false value.
573 : */
1828 alvherre 574 GIC 138355 : if (whichplan == INVALID_SUBPLAN_INDEX)
575 : {
739 efujita 576 46096 : if (node->as_nasyncplans > 0)
739 efujita 577 ECB : {
578 : /* We'd have filled as_valid_subplans already */
38 tgl 579 GNC 19 : Assert(node->as_valid_subplans_identified);
580 : }
581 46077 : else if (!node->as_valid_subplans_identified)
582 : {
1828 alvherre 583 CBC 1691 : node->as_valid_subplans =
369 alvherre 584 GIC 1691 : ExecFindMatchingSubPlans(node->as_prune_state, false);
38 tgl 585 GNC 1691 : node->as_valid_subplans_identified = true;
586 : }
1828 alvherre 587 ECB :
1828 alvherre 588 GIC 46096 : whichplan = -1;
1951 rhaas 589 ECB : }
1828 alvherre 590 :
591 : /* Ensure whichplan is within the expected range */
1828 alvherre 592 GIC 138355 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
593 :
1828 alvherre 594 CBC 138355 : if (ScanDirectionIsForward(node->ps.state->es_direction))
1828 alvherre 595 GIC 138346 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
596 : else
597 9 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
1828 alvherre 598 ECB :
1828 alvherre 599 GIC 138355 : if (nextplan < 0)
739 efujita 600 ECB : {
601 : /* Set as_syncdone if in async mode */
739 efujita 602 GIC 45853 : if (node->as_nasyncplans > 0)
739 efujita 603 CBC 17 : node->as_syncdone = true;
1828 alvherre 604 GIC 45853 : return false;
739 efujita 605 ECB : }
606 :
1828 alvherre 607 GIC 92502 : node->as_whichplan = nextplan;
1951 rhaas 608 ECB :
1951 rhaas 609 CBC 92502 : return true;
1951 rhaas 610 ECB : }
611 :
612 : /* ----------------------------------------------------------------
613 : * choose_next_subplan_for_leader
614 : *
615 : * Try to pick a plan which doesn't commit us to doing much
616 : * work locally, so that as much work as possible is done in
617 : * the workers. Cheapest subplans are at the end.
618 : * ----------------------------------------------------------------
619 : */
620 : static bool
1951 rhaas 621 GIC 241 : choose_next_subplan_for_leader(AppendState *node)
622 : {
623 241 : ParallelAppendState *pstate = node->as_pstate;
624 :
625 : /* Backward scan is not supported by parallel-aware plans */
626 241 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
1951 rhaas 627 ECB :
628 : /* We should never be called when there are no subplans */
1215 tgl 629 CBC 241 : Assert(node->as_nplans > 0);
630 :
1951 rhaas 631 GIC 241 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
1951 rhaas 632 ECB :
1951 rhaas 633 GIC 241 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
634 : {
1951 rhaas 635 ECB : /* Mark just-completed subplan as finished. */
1951 rhaas 636 GIC 184 : node->as_pstate->pa_finished[node->as_whichplan] = true;
1951 rhaas 637 ECB : }
638 : else
639 : {
640 : /* Start with last subplan. */
1951 rhaas 641 GIC 57 : node->as_whichplan = node->as_nplans - 1;
1828 alvherre 642 ECB :
643 : /*
644 : * If we've yet to determine the valid subplans then do so now. If
645 : * run-time pruning is disabled then the valid subplans will always be
646 : * set to all subplans.
647 : */
38 tgl 648 GNC 57 : if (!node->as_valid_subplans_identified)
649 : {
1828 alvherre 650 GIC 12 : node->as_valid_subplans =
369 651 12 : ExecFindMatchingSubPlans(node->as_prune_state, false);
38 tgl 652 GNC 12 : node->as_valid_subplans_identified = true;
653 :
654 : /*
1828 alvherre 655 ECB : * Mark each invalid plan as finished to allow the loop below to
656 : * select the first valid subplan.
657 : */
1828 alvherre 658 CBC 12 : mark_invalid_subplans_as_finished(node);
1828 alvherre 659 ECB : }
660 : }
661 :
662 : /* Loop until we find a subplan to execute. */
1951 rhaas 663 GIC 388 : while (pstate->pa_finished[node->as_whichplan])
664 : {
1951 rhaas 665 CBC 204 : if (node->as_whichplan == 0)
666 : {
1951 rhaas 667 GIC 57 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
668 57 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
669 57 : LWLockRelease(&pstate->pa_lock);
1951 rhaas 670 CBC 57 : return false;
671 : }
1826 alvherre 672 ECB :
673 : /*
674 : * We needn't pay attention to as_valid_subplans here as all invalid
675 : * plans have been marked as finished.
676 : */
1951 rhaas 677 CBC 147 : node->as_whichplan--;
678 : }
679 :
680 : /* If non-partial, immediately mark as finished. */
1818 alvherre 681 GIC 184 : if (node->as_whichplan < node->as_first_partial_plan)
1951 rhaas 682 61 : node->as_pstate->pa_finished[node->as_whichplan] = true;
683 :
1951 rhaas 684 CBC 184 : LWLockRelease(&pstate->pa_lock);
685 :
1951 rhaas 686 GIC 184 : return true;
687 : }
1951 rhaas 688 ECB :
689 : /* ----------------------------------------------------------------
690 : * choose_next_subplan_for_worker
691 : *
692 : * Choose next subplan for a parallel-aware Append, returning
693 : * false if there are no more.
694 : *
695 : * We start from the first plan and advance through the list;
696 : * when we get back to the end, we loop back to the first
697 : * partial plan. This assigns the non-partial plans first in
698 : * order of descending cost and then spreads out the workers
699 : * as evenly as possible across the remaining partial plans.
700 : * ----------------------------------------------------------------
701 : */
702 : static bool
1951 rhaas 703 GIC 217 : choose_next_subplan_for_worker(AppendState *node)
704 : {
705 217 : ParallelAppendState *pstate = node->as_pstate;
706 :
707 : /* Backward scan is not supported by parallel-aware plans */
708 217 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
709 :
1828 alvherre 710 ECB : /* We should never be called when there are no subplans */
1215 tgl 711 GIC 217 : Assert(node->as_nplans > 0);
1828 alvherre 712 ECB :
1951 rhaas 713 GIC 217 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
714 :
1951 rhaas 715 ECB : /* Mark just-completed subplan as finished. */
1951 rhaas 716 GIC 217 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
717 71 : node->as_pstate->pa_finished[node->as_whichplan] = true;
1951 rhaas 718 ECB :
719 : /*
1764 tgl 720 : * If we've yet to determine the valid subplans then do so now. If
721 : * run-time pruning is disabled then the valid subplans will always be set
722 : * to all subplans.
1828 alvherre 723 : */
38 tgl 724 GNC 146 : else if (!node->as_valid_subplans_identified)
725 : {
1828 alvherre 726 GIC 23 : node->as_valid_subplans =
369 727 23 : ExecFindMatchingSubPlans(node->as_prune_state, false);
38 tgl 728 GNC 23 : node->as_valid_subplans_identified = true;
729 :
1828 alvherre 730 GIC 23 : mark_invalid_subplans_as_finished(node);
731 : }
732 :
1951 rhaas 733 ECB : /* If all the plans are already done, we have nothing to do */
1951 rhaas 734 GIC 217 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
1951 rhaas 735 ECB : {
1951 rhaas 736 CBC 121 : LWLockRelease(&pstate->pa_lock);
737 121 : return false;
738 : }
1951 rhaas 739 ECB :
740 : /* Save the plan from which we are starting the search. */
1886 rhaas 741 GIC 96 : node->as_whichplan = pstate->pa_next_plan;
742 :
1826 alvherre 743 ECB : /* Loop until we find a valid subplan to execute. */
1951 rhaas 744 GIC 203 : while (pstate->pa_finished[pstate->pa_next_plan])
1951 rhaas 745 ECB : {
1826 alvherre 746 : int nextplan;
747 :
1826 alvherre 748 GIC 132 : nextplan = bms_next_member(node->as_valid_subplans,
749 : pstate->pa_next_plan);
1826 alvherre 750 CBC 132 : if (nextplan >= 0)
751 : {
752 : /* Advance to the next valid plan. */
753 99 : pstate->pa_next_plan = nextplan;
754 : }
1818 alvherre 755 GIC 33 : else if (node->as_whichplan > node->as_first_partial_plan)
756 : {
1826 alvherre 757 ECB : /*
758 : * Try looping back to the first valid partial plan, if there is
759 : * one. If there isn't, arrange to bail out below.
760 : */
1826 alvherre 761 GIC 18 : nextplan = bms_next_member(node->as_valid_subplans,
1818 alvherre 762 CBC 18 : node->as_first_partial_plan - 1);
1826 alvherre 763 GIC 18 : pstate->pa_next_plan =
1826 alvherre 764 CBC 18 : nextplan < 0 ? node->as_whichplan : nextplan;
765 : }
766 : else
767 : {
768 : /*
769 : * At last plan, and either there are no partial plans or we've
1886 rhaas 770 ECB : * tried them all. Arrange to bail out.
771 : */
1951 rhaas 772 CBC 15 : pstate->pa_next_plan = node->as_whichplan;
1951 rhaas 773 ECB : }
774 :
1951 rhaas 775 GIC 132 : if (pstate->pa_next_plan == node->as_whichplan)
776 : {
777 : /* We've tried everything! */
778 25 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
779 25 : LWLockRelease(&pstate->pa_lock);
780 25 : return false;
1951 rhaas 781 ECB : }
782 : }
783 :
784 : /* Pick the plan we found, and advance pa_next_plan one more time. */
1826 alvherre 785 GIC 71 : node->as_whichplan = pstate->pa_next_plan;
786 71 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
1826 alvherre 787 ECB : pstate->pa_next_plan);
788 :
789 : /*
790 : * If there are no more valid plans then try setting the next plan to the
791 : * first valid partial plan.
792 : */
1826 alvherre 793 GIC 71 : if (pstate->pa_next_plan < 0)
1951 rhaas 794 ECB : {
1826 alvherre 795 CBC 17 : int nextplan = bms_next_member(node->as_valid_subplans,
1818 alvherre 796 GIC 17 : node->as_first_partial_plan - 1);
797 :
1826 798 17 : if (nextplan >= 0)
799 17 : pstate->pa_next_plan = nextplan;
800 : else
801 : {
1950 rhaas 802 ECB : /*
803 : * There are no valid partial plans, and we already chose the last
1826 alvherre 804 : * non-partial plan; so flag that there's nothing more for our
805 : * fellow workers to do.
806 : */
1950 rhaas 807 LBC 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
1950 rhaas 808 ECB : }
809 : }
810 :
811 : /* If non-partial, immediately mark as finished. */
1818 alvherre 812 GIC 71 : if (node->as_whichplan < node->as_first_partial_plan)
1951 rhaas 813 8 : node->as_pstate->pa_finished[node->as_whichplan] = true;
814 :
815 71 : LWLockRelease(&pstate->pa_lock);
1951 rhaas 816 EUB :
1951 rhaas 817 GIC 71 : return true;
818 : }
819 :
820 : /*
1828 alvherre 821 ECB : * mark_invalid_subplans_as_finished
822 : * Marks the ParallelAppendState's pa_finished as true for each invalid
823 : * subplan.
824 : *
825 : * This function should only be called for parallel Append with run-time
826 : * pruning enabled.
827 : */
828 : static void
1828 alvherre 829 GIC 35 : mark_invalid_subplans_as_finished(AppendState *node)
830 : {
831 : int i;
832 :
833 : /* Only valid to call this while in parallel Append mode */
834 35 : Assert(node->as_pstate);
835 :
836 : /* Shouldn't have been called when run-time pruning is not enabled */
837 35 : Assert(node->as_prune_state);
1828 alvherre 838 ECB :
839 : /* Nothing to do if all plans are valid */
1828 alvherre 840 GIC 35 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
1828 alvherre 841 UIC 0 : return;
842 :
1828 alvherre 843 ECB : /* Mark all non-valid plans as finished */
1828 alvherre 844 GIC 113 : for (i = 0; i < node->as_nplans; i++)
845 : {
1828 alvherre 846 CBC 78 : if (!bms_is_member(i, node->as_valid_subplans))
1828 alvherre 847 GIC 35 : node->as_pstate->pa_finished[i] = true;
848 : }
1828 alvherre 849 ECB : }
739 efujita 850 EUB :
851 : /* ----------------------------------------------------------------
852 : * Asynchronous Append Support
739 efujita 853 ECB : * ----------------------------------------------------------------
854 : */
855 :
856 : /* ----------------------------------------------------------------
857 : * ExecAppendAsyncBegin
858 : *
859 : * Begin executing designed async-capable subplans.
860 : * ----------------------------------------------------------------
861 : */
862 : static void
739 efujita 863 GIC 36 : ExecAppendAsyncBegin(AppendState *node)
864 : {
865 : int i;
866 :
867 : /* Backward scan is not supported by async-aware Appends. */
868 36 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
869 :
870 : /* We should never be called when there are no subplans */
671 871 36 : Assert(node->as_nplans > 0);
671 efujita 872 ECB :
873 : /* We should never be called when there are no async subplans. */
739 efujita 874 GIC 36 : Assert(node->as_nasyncplans > 0);
875 :
876 : /* If we've yet to determine the valid subplans then do so now. */
38 tgl 877 GNC 36 : if (!node->as_valid_subplans_identified)
878 : {
739 efujita 879 GIC 2 : node->as_valid_subplans =
369 alvherre 880 CBC 2 : ExecFindMatchingSubPlans(node->as_prune_state, false);
38 tgl 881 GNC 2 : node->as_valid_subplans_identified = true;
882 :
671 efujita 883 GIC 2 : classify_matching_subplans(node);
671 efujita 884 ECB : }
885 :
886 : /* Initialize state variables. */
671 efujita 887 CBC 36 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
671 efujita 888 GIC 36 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
739 efujita 889 ECB :
890 : /* Nothing to do if there are no valid async subplans. */
739 efujita 891 CBC 36 : if (node->as_nasyncremain == 0)
739 efujita 892 UIC 0 : return;
739 efujita 893 ECB :
894 : /* Make a request for each of the valid async subplans. */
739 efujita 895 GIC 36 : i = -1;
896 105 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
739 efujita 897 ECB : {
739 efujita 898 CBC 69 : AsyncRequest *areq = node->as_asyncrequests[i];
899 :
739 efujita 900 GIC 69 : Assert(areq->request_index == i);
739 efujita 901 CBC 69 : Assert(!areq->callback_pending);
739 efujita 902 EUB :
903 : /* Do the actual work. */
739 efujita 904 GIC 69 : ExecAsyncRequest(areq);
739 efujita 905 ECB : }
906 : }
907 :
908 : /* ----------------------------------------------------------------
909 : * ExecAppendAsyncGetNext
910 : *
911 : * Get the next tuple from any of the asynchronous subplans.
912 : * ----------------------------------------------------------------
913 : */
914 : static bool
739 efujita 915 GIC 5736 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
916 : {
917 5736 : *result = NULL;
918 :
919 : /* We should never be called when there are no valid async subplans. */
920 5736 : Assert(node->as_nasyncremain > 0);
921 :
922 : /* Request a tuple asynchronously. */
923 5736 : if (ExecAppendAsyncRequest(node, result))
924 5667 : return true;
739 efujita 925 ECB :
739 efujita 926 GIC 102 : while (node->as_nasyncremain > 0)
739 efujita 927 ECB : {
739 efujita 928 GIC 72 : CHECK_FOR_INTERRUPTS();
929 :
697 efujita 930 ECB : /* Wait or poll for async events. */
739 efujita 931 GIC 72 : ExecAppendAsyncEventWait(node);
932 :
739 efujita 933 ECB : /* Request a tuple asynchronously. */
739 efujita 934 CBC 72 : if (ExecAppendAsyncRequest(node, result))
739 efujita 935 GIC 39 : return true;
739 efujita 936 ECB :
937 : /* Break from loop if there's any sync subplan that isn't complete. */
739 efujita 938 CBC 33 : if (!node->as_syncdone)
739 efujita 939 UIC 0 : break;
940 : }
739 efujita 941 ECB :
942 : /*
943 : * If all sync subplans are complete, we're totally done scanning the
697 tgl 944 : * given node. Otherwise, we're done with the asynchronous stuff but must
945 : * continue scanning the sync subplans.
946 : */
739 efujita 947 GIC 30 : if (node->as_syncdone)
739 efujita 948 ECB : {
739 efujita 949 GBC 30 : Assert(node->as_nasyncremain == 0);
739 efujita 950 GIC 30 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
951 30 : return true;
952 : }
953 :
739 efujita 954 UIC 0 : return false;
955 : }
956 :
739 efujita 957 ECB : /* ----------------------------------------------------------------
958 : * ExecAppendAsyncRequest
959 : *
960 : * Request a tuple asynchronously.
961 : * ----------------------------------------------------------------
962 : */
963 : static bool
739 efujita 964 GBC 5808 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
965 : {
966 : Bitmapset *needrequest;
967 : int i;
968 :
969 : /* Nothing to do if there are no async subplans needing a new request. */
739 efujita 970 GIC 5808 : if (bms_is_empty(node->as_needrequest))
971 : {
716 972 49 : Assert(node->as_nasyncresults == 0);
739 973 49 : return false;
716 efujita 974 ECB : }
975 :
976 : /*
977 : * If there are any asynchronously-generated results that have not yet
978 : * been returned, we have nothing to do; just return one of them.
979 : */
739 efujita 980 CBC 5759 : if (node->as_nasyncresults > 0)
981 : {
982 2493 : --node->as_nasyncresults;
983 2493 : *result = node->as_asyncresults[node->as_nasyncresults];
739 efujita 984 GIC 2493 : return true;
985 : }
986 :
987 : /* Make a new request for each of the async subplans that need it. */
988 3266 : needrequest = node->as_needrequest;
989 3266 : node->as_needrequest = NULL;
739 efujita 990 CBC 3266 : i = -1;
739 efujita 991 GIC 8969 : while ((i = bms_next_member(needrequest, i)) >= 0)
739 efujita 992 ECB : {
739 efujita 993 CBC 5703 : AsyncRequest *areq = node->as_asyncrequests[i];
739 efujita 994 ECB :
995 : /* Do the actual work. */
739 efujita 996 GIC 5703 : ExecAsyncRequest(areq);
997 : }
739 efujita 998 CBC 3266 : bms_free(needrequest);
739 efujita 999 ECB :
1000 : /* Return one of the asynchronously-generated results if any. */
739 efujita 1001 CBC 3266 : if (node->as_nasyncresults > 0)
1002 : {
1003 3213 : --node->as_nasyncresults;
739 efujita 1004 GIC 3213 : *result = node->as_asyncresults[node->as_nasyncresults];
1005 3213 : return true;
739 efujita 1006 ECB : }
1007 :
739 efujita 1008 CBC 53 : return false;
1009 : }
1010 :
739 efujita 1011 ECB : /* ----------------------------------------------------------------
1012 : * ExecAppendAsyncEventWait
1013 : *
1014 : * Wait or poll for file descriptor events and fire callbacks.
1015 : * ----------------------------------------------------------------
1016 : */
1017 : static void
739 efujita 1018 CBC 89 : ExecAppendAsyncEventWait(AppendState *node)
1019 : {
716 efujita 1020 GIC 89 : int nevents = node->as_nasyncplans + 1;
739 1021 89 : long timeout = node->as_syncdone ? -1 : 0;
1022 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1023 : int noccurred;
1024 : int i;
1025 :
1026 : /* We should never be called when there are no valid async subplans. */
1027 89 : Assert(node->as_nasyncremain > 0);
739 efujita 1028 ECB :
716 efujita 1029 GIC 89 : node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
739 efujita 1030 CBC 89 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
739 efujita 1031 ECB : NULL, NULL);
1032 :
1033 : /* Give each waiting subplan a chance to add an event. */
739 efujita 1034 GIC 89 : i = -1;
1035 271 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1036 : {
739 efujita 1037 CBC 182 : AsyncRequest *areq = node->as_asyncrequests[i];
1038 :
1039 182 : if (areq->callback_pending)
1040 163 : ExecAsyncConfigureWait(areq);
1041 : }
1042 :
1043 : /*
618 efujita 1044 ECB : * No need for further processing if there are no configured events other
1045 : * than the postmaster death event.
1046 : */
618 efujita 1047 CBC 89 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1048 : {
1049 1 : FreeWaitEventSet(node->as_eventset);
1050 1 : node->as_eventset = NULL;
618 efujita 1051 GIC 1 : return;
1052 : }
1053 :
1054 : /* We wait on at most EVENT_BUFFER_SIZE events. */
716 1055 88 : if (nevents > EVENT_BUFFER_SIZE)
716 efujita 1056 UIC 0 : nevents = EVENT_BUFFER_SIZE;
716 efujita 1057 ECB :
1058 : /*
1059 : * If the timeout is -1, wait until at least one event occurs. If the
1060 : * timeout is 0, poll for events, but do not wait at all.
1061 : */
739 efujita 1062 GIC 88 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1063 : nevents, WAIT_EVENT_APPEND_READY);
1064 88 : FreeWaitEventSet(node->as_eventset);
739 efujita 1065 CBC 88 : node->as_eventset = NULL;
739 efujita 1066 GBC 88 : if (noccurred == 0)
739 efujita 1067 UIC 0 : return;
1068 :
1069 : /* Deliver notifications. */
739 efujita 1070 GIC 233 : for (i = 0; i < noccurred; i++)
1071 : {
739 efujita 1072 CBC 145 : WaitEvent *w = &occurred_event[i];
1073 :
739 efujita 1074 ECB : /*
1075 : * Each waiting subplan should have registered its wait event with
1076 : * user_data pointing back to its AsyncRequest.
739 efujita 1077 EUB : */
739 efujita 1078 GIC 145 : if ((w->events & WL_SOCKET_READABLE) != 0)
1079 : {
739 efujita 1080 CBC 145 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1081 :
615 1082 145 : if (areq->callback_pending)
1083 : {
1084 : /*
1085 : * Mark it as no longer needing a callback. We must do this
1086 : * before dispatching the callback in case the callback resets
1087 : * the flag.
615 efujita 1088 ECB : */
615 efujita 1089 GIC 145 : areq->callback_pending = false;
615 efujita 1090 ECB :
1091 : /* Do the actual work. */
615 efujita 1092 CBC 145 : ExecAsyncNotify(areq);
1093 : }
1094 : }
1095 : }
1096 : }
1097 :
1098 : /* ----------------------------------------------------------------
739 efujita 1099 ECB : * ExecAsyncAppendResponse
1100 : *
1101 : * Receive a response from an asynchronous request we made.
1102 : * ----------------------------------------------------------------
1103 : */
1104 : void
739 efujita 1105 GIC 5920 : ExecAsyncAppendResponse(AsyncRequest *areq)
1106 : {
1107 5920 : AppendState *node = (AppendState *) areq->requestor;
1108 5920 : TupleTableSlot *slot = areq->result;
1109 :
1110 : /* The result should be a TupleTableSlot or NULL. */
1111 5920 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1112 :
1113 : /* Nothing to do if the request is pending. */
1114 5920 : if (!areq->request_complete)
739 efujita 1115 ECB : {
1116 : /* The request would have been pending for a callback. */
739 efujita 1117 CBC 154 : Assert(areq->callback_pending);
1118 154 : return;
1119 : }
1120 :
739 efujita 1121 ECB : /* If the result is NULL or an empty slot, there's nothing more to do. */
739 efujita 1122 GIC 5766 : if (TupIsNull(slot))
1123 : {
739 efujita 1124 ECB : /* The ending subplan wouldn't have been pending for a callback. */
739 efujita 1125 GIC 59 : Assert(!areq->callback_pending);
1126 59 : --node->as_nasyncremain;
739 efujita 1127 CBC 59 : return;
739 efujita 1128 ECB : }
1129 :
1130 : /* Save result so we can return it. */
739 efujita 1131 GIC 5707 : Assert(node->as_nasyncresults < node->as_nasyncplans);
739 efujita 1132 CBC 5707 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1133 :
1134 : /*
739 efujita 1135 ECB : * Mark the subplan that returned a result as ready for a new request. We
1136 : * don't launch another one here immediately because it might complete.
1137 : */
739 efujita 1138 GIC 5707 : node->as_needrequest = bms_add_member(node->as_needrequest,
1139 : areq->request_index);
1140 : }
739 efujita 1141 ECB :
1142 : /* ----------------------------------------------------------------
1143 : * classify_matching_subplans
1144 : *
1145 : * Classify the node's as_valid_subplans into sync ones and
1146 : * async ones, adjust it to contain sync ones only, and save
1147 : * async ones in the node's as_valid_asyncplans.
1148 : * ----------------------------------------------------------------
1149 : */
1150 : static void
739 efujita 1151 GIC 45 : classify_matching_subplans(AppendState *node)
1152 : {
1153 : Bitmapset *valid_asyncplans;
1154 :
38 tgl 1155 GNC 45 : Assert(node->as_valid_subplans_identified);
739 efujita 1156 GIC 45 : Assert(node->as_valid_asyncplans == NULL);
1157 :
1158 : /* Nothing to do if there are no valid subplans. */
1159 45 : if (bms_is_empty(node->as_valid_subplans))
1160 : {
739 efujita 1161 UIC 0 : node->as_syncdone = true;
739 efujita 1162 LBC 0 : node->as_nasyncremain = 0;
739 efujita 1163 UIC 0 : return;
1164 : }
1165 :
739 efujita 1166 ECB : /* Nothing to do if there are no valid async subplans. */
739 efujita 1167 CBC 45 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1168 : {
739 efujita 1169 UIC 0 : node->as_nasyncremain = 0;
739 efujita 1170 LBC 0 : return;
1171 : }
739 efujita 1172 EUB :
1173 : /* Get valid async subplans. */
38 tgl 1174 GNC 45 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1175 45 : node->as_valid_subplans);
1176 :
739 efujita 1177 ECB : /* Adjust the valid subplans to contain sync subplans only. */
739 efujita 1178 GIC 45 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
739 efujita 1179 EUB : valid_asyncplans);
1180 :
1181 : /* Save valid async subplans. */
739 efujita 1182 GIC 45 : node->as_valid_asyncplans = valid_asyncplans;
1183 : }
|