Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeAppend.c
4 : : * routines to handle append nodes.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/executor/nodeAppend.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : /* INTERFACE ROUTINES
16 : : * ExecInitAppend - initialize the append node
17 : : * ExecAppend - retrieve the next tuple from the node
18 : : * ExecEndAppend - shut down the append node
19 : : * ExecReScanAppend - rescan the append node
20 : : *
21 : : * NOTES
22 : : * Each append node contains a list of one or more subplans which
23 : : * must be iteratively processed (forwards or backwards).
24 : : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : : * until the subplan stops returning tuples, at which point that
26 : : * plan is shut down and the next started up.
27 : : *
28 : : * Append nodes don't make use of their left and right
29 : : * subtrees, rather they maintain a list of subplans so
30 : : * a typical append node looks like this in the plan tree:
31 : : *
32 : : * ...
33 : : * /
34 : : * Append -------+------+------+--- nil
35 : : * / \ | | |
36 : : * nil nil ... ... ...
37 : : * subplans
38 : : *
39 : : * Append nodes are currently used for unions, and to support
40 : : * inheritance queries, where several relations need to be scanned.
41 : : * For example, in our standard person/student/employee/student-emp
42 : : * example, where student and employee inherit from person
43 : : * and student-emp inherits from student and employee, the
44 : : * query:
45 : : *
46 : : * select name from person
47 : : *
48 : : * generates the plan:
49 : : *
50 : : * |
51 : : * Append -------+-------+--------+--------+
52 : : * / \ | | | |
53 : : * nil nil Scan Scan Scan Scan
54 : : * | | | |
55 : : * person employee student student-emp
56 : : */
57 : :
58 : : #include "postgres.h"
59 : :
60 : : #include "executor/execAsync.h"
61 : : #include "executor/execPartition.h"
62 : : #include "executor/executor.h"
63 : : #include "executor/nodeAppend.h"
64 : : #include "miscadmin.h"
65 : : #include "pgstat.h"
66 : : #include "storage/latch.h"
67 : :
68 : : /* Shared state for parallel-aware Append. */
69 : : struct ParallelAppendState
70 : : {
71 : : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : : int pa_next_plan; /* next plan to choose by any worker */
73 : :
74 : : /*
75 : : * pa_finished[i] should be true if no more workers should select subplan
76 : : * i. for a non-partial plan, this should be set to true as soon as a
77 : : * worker selects the plan; for a partial plan, it remains false until
78 : : * some worker executes the plan to completion.
79 : : */
80 : : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : : };
82 : :
83 : : #define INVALID_SUBPLAN_INDEX -1
84 : : #define EVENT_BUFFER_SIZE 16
85 : :
86 : : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : : static bool choose_next_subplan_locally(AppendState *node);
88 : : static bool choose_next_subplan_for_leader(AppendState *node);
89 : : static bool choose_next_subplan_for_worker(AppendState *node);
90 : : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : : static void ExecAppendAsyncBegin(AppendState *node);
92 : : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : : static void ExecAppendAsyncEventWait(AppendState *node);
95 : : static void classify_matching_subplans(AppendState *node);
96 : :
97 : : /* ----------------------------------------------------------------
98 : : * ExecInitAppend
99 : : *
100 : : * Begin all of the subscans of the append node.
101 : : *
102 : : * (This is potentially wasteful, since the entire result of the
103 : : * append node may not be scanned, but this way all of the
104 : : * structures get allocated in the executor's top level memory
105 : : * block instead of that of the call to ExecAppend.)
106 : : * ----------------------------------------------------------------
107 : : */
108 : : AppendState *
6620 tgl@sss.pgh.pa.us 109 :CBC 6881 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : : {
7801 111 : 6881 : AppendState *appendstate = makeNode(AppendState);
112 : : PlanState **appendplanstates;
113 : : Bitmapset *validsubplans;
114 : : Bitmapset *asyncplans;
115 : : int nplans;
116 : : int nasyncplans;
117 : : int firstvalid;
118 : : int i,
119 : : j;
120 : :
121 : : /* check for unsupported flags */
6620 122 [ - + ]: 6881 : Assert(!(eflags & EXEC_FLAG_MARK));
123 : :
124 : : /*
125 : : * create new AppendState for our append node
126 : : */
7801 127 : 6881 : appendstate->ps.plan = (Plan *) node;
128 : 6881 : appendstate->ps.state = estate;
2463 andres@anarazel.de 129 : 6881 : appendstate->ps.ExecProcNode = ExecAppend;
130 : :
131 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2199 alvherre@alvh.no-ip. 132 : 6881 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
1110 efujita@postgresql.o 133 : 6881 : appendstate->as_syncdone = false;
134 : 6881 : appendstate->as_begun = false;
135 : :
136 : : /* If run-time partition pruning is enabled, then set that up now */
346 alvherre@alvh.no-ip. 137 [ + + ]: 6881 : if (node->part_prune_info != NULL)
138 : : {
139 : : PartitionPruneState *prunestate;
140 : :
141 : : /*
142 : : * Set up pruning data structure. This also initializes the set of
143 : : * subplans to initialize (validsubplans) by taking into account the
144 : : * result of performing initial pruning if any.
145 : : */
740 146 : 297 : prunestate = ExecInitPartitionPruning(&appendstate->ps,
147 : 297 : list_length(node->appendplans),
346 148 : 297 : node->part_prune_info,
149 : : &validsubplans);
2132 tgl@sss.pgh.pa.us 150 : 297 : appendstate->as_prune_state = prunestate;
740 alvherre@alvh.no-ip. 151 : 297 : nplans = bms_num_members(validsubplans);
152 : :
153 : : /*
154 : : * When no run-time pruning is required and there's at least one
155 : : * subplan, we can fill as_valid_subplans immediately, preventing
156 : : * later calls to ExecFindMatchingSubPlans.
157 : : */
1586 tgl@sss.pgh.pa.us 158 [ + + + + ]: 297 : if (!prunestate->do_exec_prune && nplans > 0)
159 : : {
2199 alvherre@alvh.no-ip. 160 : 91 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
409 tgl@sss.pgh.pa.us 161 : 91 : appendstate->as_valid_subplans_identified = true;
162 : : }
163 : : }
164 : : else
165 : : {
2199 alvherre@alvh.no-ip. 166 : 6584 : nplans = list_length(node->appendplans);
167 : :
168 : : /*
169 : : * When run-time partition pruning is not enabled we can just mark all
170 : : * subplans as valid; they must also all be initialized.
171 : : */
2085 172 [ - + ]: 6584 : Assert(nplans > 0);
2199 173 : 6584 : appendstate->as_valid_subplans = validsubplans =
174 : 6584 : bms_add_range(NULL, 0, nplans - 1);
409 tgl@sss.pgh.pa.us 175 : 6584 : appendstate->as_valid_subplans_identified = true;
2199 alvherre@alvh.no-ip. 176 : 6584 : appendstate->as_prune_state = NULL;
177 : : }
178 : :
179 : : /*
180 : : * Initialize result tuple type and slot.
181 : : */
1977 andres@anarazel.de 182 : 6881 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
183 : :
184 : : /* node returns slots from each of its subnodes, therefore not fixed */
185 : 6881 : appendstate->ps.resultopsset = true;
186 : 6881 : appendstate->ps.resultopsfixed = false;
187 : :
2199 alvherre@alvh.no-ip. 188 : 6881 : appendplanstates = (PlanState **) palloc(nplans *
189 : : sizeof(PlanState *));
190 : :
191 : : /*
192 : : * call ExecInitNode on each of the valid plans to be executed and save
193 : : * the results into the appendplanstates array.
194 : : *
195 : : * While at it, find out the first valid partial plan.
196 : : */
1728 drowley@postgresql.o 197 : 6881 : j = 0;
1110 efujita@postgresql.o 198 : 6881 : asyncplans = NULL;
199 : 6881 : nasyncplans = 0;
2189 alvherre@alvh.no-ip. 200 : 6881 : firstvalid = nplans;
1728 drowley@postgresql.o 201 : 6881 : i = -1;
202 [ + + ]: 27132 : while ((i = bms_next_member(validsubplans, i)) >= 0)
203 : : {
204 : 20251 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
205 : :
206 : : /*
207 : : * Record async subplans. When executing EvalPlanQual, we treat them
208 : : * as sync ones; don't do this when initializing an EvalPlanQual plan
209 : : * tree.
210 : : */
1110 efujita@postgresql.o 211 [ + + + - ]: 20251 : if (initNode->async_capable && estate->es_epq_active == NULL)
212 : : {
213 : 93 : asyncplans = bms_add_member(asyncplans, j);
214 : 93 : nasyncplans++;
215 : : }
216 : :
217 : : /*
218 : : * Record the lowest appendplans index which is a valid partial plan.
219 : : */
1728 drowley@postgresql.o 220 [ + + + + ]: 20251 : if (i >= node->first_partial_plan && j < firstvalid)
221 : 225 : firstvalid = j;
222 : :
223 : 20251 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
224 : : }
225 : :
2189 alvherre@alvh.no-ip. 226 : 6881 : appendstate->as_first_partial_plan = firstvalid;
2199 227 : 6881 : appendstate->appendplans = appendplanstates;
228 : 6881 : appendstate->as_nplans = nplans;
229 : :
230 : : /* Initialize async state */
1110 efujita@postgresql.o 231 : 6881 : appendstate->as_asyncplans = asyncplans;
232 : 6881 : appendstate->as_nasyncplans = nasyncplans;
233 : 6881 : appendstate->as_asyncrequests = NULL;
1042 234 : 6881 : appendstate->as_asyncresults = NULL;
235 : 6881 : appendstate->as_nasyncresults = 0;
236 : 6881 : appendstate->as_nasyncremain = 0;
1110 237 : 6881 : appendstate->as_needrequest = NULL;
238 : 6881 : appendstate->as_eventset = NULL;
1042 239 : 6881 : appendstate->as_valid_asyncplans = NULL;
240 : :
1110 241 [ + + ]: 6881 : if (nasyncplans > 0)
242 : : {
243 : 47 : appendstate->as_asyncrequests = (AsyncRequest **)
244 : 47 : palloc0(nplans * sizeof(AsyncRequest *));
245 : :
246 : 47 : i = -1;
247 [ + + ]: 140 : while ((i = bms_next_member(asyncplans, i)) >= 0)
248 : : {
249 : : AsyncRequest *areq;
250 : :
251 : 93 : areq = palloc(sizeof(AsyncRequest));
252 : 93 : areq->requestor = (PlanState *) appendstate;
253 : 93 : areq->requestee = appendplanstates[i];
254 : 93 : areq->request_index = i;
255 : 93 : areq->callback_pending = false;
256 : 93 : areq->request_complete = false;
257 : 93 : areq->result = NULL;
258 : :
259 : 93 : appendstate->as_asyncrequests[i] = areq;
260 : : }
261 : :
1042 262 : 47 : appendstate->as_asyncresults = (TupleTableSlot **)
263 : 47 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
264 : :
409 tgl@sss.pgh.pa.us 265 [ + + ]: 47 : if (appendstate->as_valid_subplans_identified)
1042 efujita@postgresql.o 266 : 44 : classify_matching_subplans(appendstate);
267 : : }
268 : :
269 : : /*
270 : : * Miscellaneous initialization
271 : : */
272 : :
2199 alvherre@alvh.no-ip. 273 : 6881 : appendstate->ps.ps_ProjInfo = NULL;
274 : :
275 : : /* For parallel query, this will be overridden later. */
2322 rhaas@postgresql.org 276 : 6881 : appendstate->choose_next_subplan = choose_next_subplan_locally;
277 : :
7801 tgl@sss.pgh.pa.us 278 : 6881 : return appendstate;
279 : : }
280 : :
281 : : /* ----------------------------------------------------------------
282 : : * ExecAppend
283 : : *
284 : : * Handles iteration over multiple subplans.
285 : : * ----------------------------------------------------------------
286 : : */
287 : : static TupleTableSlot *
2463 andres@anarazel.de 288 : 1724645 : ExecAppend(PlanState *pstate)
289 : : {
290 : 1724645 : AppendState *node = castNode(AppendState, pstate);
291 : : TupleTableSlot *result;
292 : :
293 : : /*
294 : : * If this is the first call after Init or ReScan, we need to do the
295 : : * initialization work.
296 : : */
1110 efujita@postgresql.o 297 [ + + ]: 1724645 : if (!node->as_begun)
298 : : {
299 [ - + ]: 13231 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
300 [ - + ]: 13231 : Assert(!node->as_syncdone);
301 : :
302 : : /* Nothing to do if there are no subplans */
1586 tgl@sss.pgh.pa.us 303 [ + + ]: 13231 : if (node->as_nplans == 0)
304 : 18 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
305 : :
306 : : /* If there are any async subplans, begin executing them. */
1110 efujita@postgresql.o 307 [ + + ]: 13213 : if (node->as_nasyncplans > 0)
308 : 37 : ExecAppendAsyncBegin(node);
309 : :
310 : : /*
311 : : * If no sync subplan has been chosen, we must choose one before
312 : : * proceeding.
313 : : */
314 [ + + + + ]: 13213 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
2199 alvherre@alvh.no-ip. 315 : 1573 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
316 : :
1110 efujita@postgresql.o 317 [ + + + - : 11640 : Assert(node->as_syncdone ||
- + ]
318 : : (node->as_whichplan >= 0 &&
319 : : node->as_whichplan < node->as_nplans));
320 : :
321 : : /* And we're initialized. */
322 : 11640 : node->as_begun = true;
323 : : }
324 : :
325 : : for (;;)
9716 bruce@momjian.us 326 : 15653 : {
327 : : PlanState *subnode;
328 : :
2455 andres@anarazel.de 329 [ - + ]: 1738707 : CHECK_FOR_INTERRUPTS();
330 : :
331 : : /*
332 : : * try to get a tuple from an async subplan if any
333 : : */
1110 efujita@postgresql.o 334 [ + + - + ]: 1738707 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
335 : : {
336 [ + - ]: 6138 : if (ExecAppendAsyncGetNext(node, &result))
337 : 6137 : return result;
1110 efujita@postgresql.o 338 [ # # ]:UBC 0 : Assert(!node->as_syncdone);
339 [ # # ]: 0 : Assert(bms_is_empty(node->as_needrequest));
340 : : }
341 : :
342 : : /*
343 : : * figure out which sync subplan we are currently processing
344 : : */
2322 rhaas@postgresql.org 345 [ + - - + ]:CBC 1732569 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
6902 tgl@sss.pgh.pa.us 346 : 1732569 : subnode = node->appendplans[node->as_whichplan];
347 : :
348 : : /*
349 : : * get a tuple from the subplan
350 : : */
351 : 1732569 : result = ExecProcNode(subnode);
352 : :
353 [ + + + + ]: 1732543 : if (!TupIsNull(result))
354 : : {
355 : : /*
356 : : * If the subplan gave us something then return it as-is. We do
357 : : * NOT make use of the result slot that was set up in
358 : : * ExecInitAppend; there's no need for it.
359 : : */
360 : 1705541 : return result;
361 : : }
362 : :
363 : : /*
364 : : * wait or poll for async events if any. We do this before checking
365 : : * for the end of iteration, because it might drain the remaining
366 : : * async subplans.
367 : : */
1110 efujita@postgresql.o 368 [ + + ]: 27002 : if (node->as_nasyncremain > 0)
369 : 17 : ExecAppendAsyncEventWait(node);
370 : :
371 : : /* choose new sync subplan; if no sync/async subplans, we're done */
372 [ + + + + ]: 27002 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
6902 tgl@sss.pgh.pa.us 373 : 11349 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
374 : : }
375 : : }
376 : :
377 : : /* ----------------------------------------------------------------
378 : : * ExecEndAppend
379 : : *
380 : : * Shuts down the subscans of the append node.
381 : : *
382 : : * Returns nothing of interest.
383 : : * ----------------------------------------------------------------
384 : : */
385 : : void
7801 386 : 6742 : ExecEndAppend(AppendState *node)
387 : : {
388 : : PlanState **appendplans;
389 : : int nplans;
390 : : int i;
391 : :
392 : : /*
393 : : * get information from the node
394 : : */
9405 bruce@momjian.us 395 : 6742 : appendplans = node->appendplans;
7801 tgl@sss.pgh.pa.us 396 : 6742 : nplans = node->as_nplans;
397 : :
398 : : /*
399 : : * shut down each of the subscans
400 : : */
9716 bruce@momjian.us 401 [ + + ]: 26664 : for (i = 0; i < nplans; i++)
5300 tgl@sss.pgh.pa.us 402 : 19922 : ExecEndNode(appendplans[i]);
9716 bruce@momjian.us 403 : 6742 : }
404 : :
405 : : void
5025 tgl@sss.pgh.pa.us 406 : 8310 : ExecReScanAppend(AppendState *node)
407 : : {
1110 efujita@postgresql.o 408 : 8310 : int nasyncplans = node->as_nasyncplans;
409 : : int i;
410 : :
411 : : /*
412 : : * If any PARAM_EXEC Params used in pruning expressions have changed, then
413 : : * we'd better unset the valid subplans so that they are reselected for
414 : : * the new parameter values.
415 : : */
2199 alvherre@alvh.no-ip. 416 [ + + + - ]: 9944 : if (node->as_prune_state &&
417 : 1634 : bms_overlap(node->ps.chgParam,
2135 tgl@sss.pgh.pa.us 418 : 1634 : node->as_prune_state->execparamids))
419 : : {
409 420 : 1634 : node->as_valid_subplans_identified = false;
2199 alvherre@alvh.no-ip. 421 : 1634 : bms_free(node->as_valid_subplans);
422 : 1634 : node->as_valid_subplans = NULL;
409 tgl@sss.pgh.pa.us 423 : 1634 : bms_free(node->as_valid_asyncplans);
424 : 1634 : node->as_valid_asyncplans = NULL;
425 : : }
426 : :
5300 427 [ + + ]: 36345 : for (i = 0; i < node->as_nplans; i++)
428 : : {
7559 bruce@momjian.us 429 : 28035 : PlanState *subnode = node->appendplans[i];
430 : :
431 : : /*
432 : : * ExecReScan doesn't know about my subplans, so I have to do
433 : : * changed-parameter signaling myself.
434 : : */
7735 tgl@sss.pgh.pa.us 435 [ + + ]: 28035 : if (node->ps.chgParam != NULL)
436 : 26318 : UpdateChangedParamSet(subnode, node->ps.chgParam);
437 : :
438 : : /*
439 : : * If chgParam of subnode is not null then plan will be re-scanned by
440 : : * first ExecProcNode or by first ExecAsyncRequest.
441 : : */
5025 442 [ + + ]: 28035 : if (subnode->chgParam == NULL)
443 : 6483 : ExecReScan(subnode);
444 : : }
445 : :
446 : : /* Reset async state */
1110 efujita@postgresql.o 447 [ + + ]: 8310 : if (nasyncplans > 0)
448 : : {
449 : 17 : i = -1;
450 [ + + ]: 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
451 : : {
452 : 34 : AsyncRequest *areq = node->as_asyncrequests[i];
453 : :
454 : 34 : areq->callback_pending = false;
455 : 34 : areq->request_complete = false;
456 : 34 : areq->result = NULL;
457 : : }
458 : :
1042 459 : 17 : node->as_nasyncresults = 0;
460 : 17 : node->as_nasyncremain = 0;
1110 461 : 17 : bms_free(node->as_needrequest);
462 : 17 : node->as_needrequest = NULL;
463 : : }
464 : :
465 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2199 alvherre@alvh.no-ip. 466 : 8310 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
1110 efujita@postgresql.o 467 : 8310 : node->as_syncdone = false;
468 : 8310 : node->as_begun = false;
2322 rhaas@postgresql.org 469 : 8310 : }
470 : :
471 : : /* ----------------------------------------------------------------
472 : : * Parallel Append Support
473 : : * ----------------------------------------------------------------
474 : : */
475 : :
476 : : /* ----------------------------------------------------------------
477 : : * ExecAppendEstimate
478 : : *
479 : : * Compute the amount of space we'll need in the parallel
480 : : * query DSM, and inform pcxt->estimator about our needs.
481 : : * ----------------------------------------------------------------
482 : : */
483 : : void
484 : 69 : ExecAppendEstimate(AppendState *node,
485 : : ParallelContext *pcxt)
486 : : {
487 : 69 : node->pstate_len =
488 : 69 : add_size(offsetof(ParallelAppendState, pa_finished),
489 : 69 : sizeof(bool) * node->as_nplans);
490 : :
491 : 69 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
492 : 69 : shm_toc_estimate_keys(&pcxt->estimator, 1);
493 : 69 : }
494 : :
495 : :
496 : : /* ----------------------------------------------------------------
497 : : * ExecAppendInitializeDSM
498 : : *
499 : : * Set up shared state for Parallel Append.
500 : : * ----------------------------------------------------------------
501 : : */
502 : : void
503 : 69 : ExecAppendInitializeDSM(AppendState *node,
504 : : ParallelContext *pcxt)
505 : : {
506 : : ParallelAppendState *pstate;
507 : :
508 : 69 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
509 : 69 : memset(pstate, 0, node->pstate_len);
510 : 69 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
511 : 69 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
512 : :
513 : 69 : node->as_pstate = pstate;
514 : 69 : node->choose_next_subplan = choose_next_subplan_for_leader;
515 : 69 : }
516 : :
517 : : /* ----------------------------------------------------------------
518 : : * ExecAppendReInitializeDSM
519 : : *
520 : : * Reset shared state before beginning a fresh scan.
521 : : * ----------------------------------------------------------------
522 : : */
523 : : void
2322 rhaas@postgresql.org 524 :UBC 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
525 : : {
526 : 0 : ParallelAppendState *pstate = node->as_pstate;
527 : :
528 : 0 : pstate->pa_next_plan = 0;
529 : 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
530 : 0 : }
531 : :
532 : : /* ----------------------------------------------------------------
533 : : * ExecAppendInitializeWorker
534 : : *
535 : : * Copy relevant information from TOC into planstate, and initialize
536 : : * whatever is required to choose and execute the optimal subplan.
537 : : * ----------------------------------------------------------------
538 : : */
539 : : void
2322 rhaas@postgresql.org 540 :CBC 159 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
541 : : {
542 : 159 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
543 : 159 : node->choose_next_subplan = choose_next_subplan_for_worker;
544 : 159 : }
545 : :
546 : : /* ----------------------------------------------------------------
547 : : * choose_next_subplan_locally
548 : : *
549 : : * Choose next sync subplan for a non-parallel-aware Append,
550 : : * returning false if there are no more.
551 : : * ----------------------------------------------------------------
552 : : */
553 : : static bool
554 : 39725 : choose_next_subplan_locally(AppendState *node)
555 : : {
556 : 39725 : int whichplan = node->as_whichplan;
557 : : int nextplan;
558 : :
559 : : /* We should never be called when there are no subplans */
1586 tgl@sss.pgh.pa.us 560 [ - + ]: 39725 : Assert(node->as_nplans > 0);
561 : :
562 : : /* Nothing to do if syncdone */
1110 efujita@postgresql.o 563 [ + + ]: 39725 : if (node->as_syncdone)
564 : 18 : return false;
565 : :
566 : : /*
567 : : * If first call then have the bms member function choose the first valid
568 : : * sync subplan by initializing whichplan to -1. If there happen to be no
569 : : * valid sync subplans then the bms member function will handle that by
570 : : * returning a negative number which will allow us to exit returning a
571 : : * false value.
572 : : */
2199 alvherre@alvh.no-ip. 573 [ + + ]: 39707 : if (whichplan == INVALID_SUBPLAN_INDEX)
574 : : {
1110 efujita@postgresql.o 575 [ + + ]: 12994 : if (node->as_nasyncplans > 0)
576 : : {
577 : : /* We'd have filled as_valid_subplans already */
409 tgl@sss.pgh.pa.us 578 [ - + ]: 19 : Assert(node->as_valid_subplans_identified);
579 : : }
580 [ + + ]: 12975 : else if (!node->as_valid_subplans_identified)
581 : : {
2199 alvherre@alvh.no-ip. 582 : 1691 : node->as_valid_subplans =
740 583 : 1691 : ExecFindMatchingSubPlans(node->as_prune_state, false);
409 tgl@sss.pgh.pa.us 584 : 1691 : node->as_valid_subplans_identified = true;
585 : : }
586 : :
2199 alvherre@alvh.no-ip. 587 : 12994 : whichplan = -1;
588 : : }
589 : :
590 : : /* Ensure whichplan is within the expected range */
591 [ + - - + ]: 39707 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
592 : :
593 [ + + ]: 39707 : if (ScanDirectionIsForward(node->ps.state->es_direction))
594 : 39698 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
595 : : else
596 : 9 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
597 : :
598 [ + + ]: 39707 : if (nextplan < 0)
599 : : {
600 : : /* Set as_syncdone if in async mode */
1110 efujita@postgresql.o 601 [ + + ]: 12738 : if (node->as_nasyncplans > 0)
602 : 17 : node->as_syncdone = true;
2199 alvherre@alvh.no-ip. 603 : 12738 : return false;
604 : : }
605 : :
606 : 26969 : node->as_whichplan = nextplan;
607 : :
2322 rhaas@postgresql.org 608 : 26969 : return true;
609 : : }
610 : :
611 : : /* ----------------------------------------------------------------
612 : : * choose_next_subplan_for_leader
613 : : *
614 : : * Try to pick a plan which doesn't commit us to doing much
615 : : * work locally, so that as much work as possible is done in
616 : : * the workers. Cheapest subplans are at the end.
617 : : * ----------------------------------------------------------------
618 : : */
619 : : static bool
620 : 233 : choose_next_subplan_for_leader(AppendState *node)
621 : : {
622 : 233 : ParallelAppendState *pstate = node->as_pstate;
623 : :
624 : : /* Backward scan is not supported by parallel-aware plans */
625 [ - + ]: 233 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
626 : :
627 : : /* We should never be called when there are no subplans */
1586 tgl@sss.pgh.pa.us 628 [ - + ]: 233 : Assert(node->as_nplans > 0);
629 : :
2322 rhaas@postgresql.org 630 : 233 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
631 : :
632 [ + + ]: 233 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
633 : : {
634 : : /* Mark just-completed subplan as finished. */
635 : 173 : node->as_pstate->pa_finished[node->as_whichplan] = true;
636 : : }
637 : : else
638 : : {
639 : : /* Start with last subplan. */
640 : 60 : node->as_whichplan = node->as_nplans - 1;
641 : :
642 : : /*
643 : : * If we've yet to determine the valid subplans then do so now. If
644 : : * run-time pruning is disabled then the valid subplans will always be
645 : : * set to all subplans.
646 : : */
409 tgl@sss.pgh.pa.us 647 [ + + ]: 60 : if (!node->as_valid_subplans_identified)
648 : : {
2199 alvherre@alvh.no-ip. 649 : 12 : node->as_valid_subplans =
740 650 : 12 : ExecFindMatchingSubPlans(node->as_prune_state, false);
409 tgl@sss.pgh.pa.us 651 : 12 : node->as_valid_subplans_identified = true;
652 : :
653 : : /*
654 : : * Mark each invalid plan as finished to allow the loop below to
655 : : * select the first valid subplan.
656 : : */
2199 alvherre@alvh.no-ip. 657 : 12 : mark_invalid_subplans_as_finished(node);
658 : : }
659 : : }
660 : :
661 : : /* Loop until we find a subplan to execute. */
2322 rhaas@postgresql.org 662 [ + + ]: 383 : while (pstate->pa_finished[node->as_whichplan])
663 : : {
664 [ + + ]: 210 : if (node->as_whichplan == 0)
665 : : {
666 : 60 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
667 : 60 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
668 : 60 : LWLockRelease(&pstate->pa_lock);
669 : 60 : return false;
670 : : }
671 : :
672 : : /*
673 : : * We needn't pay attention to as_valid_subplans here as all invalid
674 : : * plans have been marked as finished.
675 : : */
676 : 150 : node->as_whichplan--;
677 : : }
678 : :
679 : : /* If non-partial, immediately mark as finished. */
2189 alvherre@alvh.no-ip. 680 [ + + ]: 173 : if (node->as_whichplan < node->as_first_partial_plan)
2322 rhaas@postgresql.org 681 : 45 : node->as_pstate->pa_finished[node->as_whichplan] = true;
682 : :
683 : 173 : LWLockRelease(&pstate->pa_lock);
684 : :
685 : 173 : return true;
686 : : }
687 : :
688 : : /* ----------------------------------------------------------------
689 : : * choose_next_subplan_for_worker
690 : : *
691 : : * Choose next subplan for a parallel-aware Append, returning
692 : : * false if there are no more.
693 : : *
694 : : * We start from the first plan and advance through the list;
695 : : * when we get back to the end, we loop back to the first
696 : : * partial plan. This assigns the non-partial plans first in
697 : : * order of descending cost and then spreads out the workers
698 : : * as evenly as possible across the remaining partial plans.
699 : : * ----------------------------------------------------------------
700 : : */
701 : : static bool
702 : 257 : choose_next_subplan_for_worker(AppendState *node)
703 : : {
704 : 257 : ParallelAppendState *pstate = node->as_pstate;
705 : :
706 : : /* Backward scan is not supported by parallel-aware plans */
707 [ - + ]: 257 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
708 : :
709 : : /* We should never be called when there are no subplans */
1586 tgl@sss.pgh.pa.us 710 [ - + ]: 257 : Assert(node->as_nplans > 0);
711 : :
2322 rhaas@postgresql.org 712 : 257 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
713 : :
714 : : /* Mark just-completed subplan as finished. */
715 [ + + ]: 257 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
716 : 116 : node->as_pstate->pa_finished[node->as_whichplan] = true;
717 : :
718 : : /*
719 : : * If we've yet to determine the valid subplans then do so now. If
720 : : * run-time pruning is disabled then the valid subplans will always be set
721 : : * to all subplans.
722 : : */
409 tgl@sss.pgh.pa.us 723 [ + + ]: 141 : else if (!node->as_valid_subplans_identified)
724 : : {
2199 alvherre@alvh.no-ip. 725 : 12 : node->as_valid_subplans =
740 726 : 12 : ExecFindMatchingSubPlans(node->as_prune_state, false);
409 tgl@sss.pgh.pa.us 727 : 12 : node->as_valid_subplans_identified = true;
728 : :
2199 alvherre@alvh.no-ip. 729 : 12 : mark_invalid_subplans_as_finished(node);
730 : : }
731 : :
732 : : /* If all the plans are already done, we have nothing to do */
2322 rhaas@postgresql.org 733 [ + + ]: 257 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
734 : : {
735 : 123 : LWLockRelease(&pstate->pa_lock);
736 : 123 : return false;
737 : : }
738 : :
739 : : /* Save the plan from which we are starting the search. */
2257 740 : 134 : node->as_whichplan = pstate->pa_next_plan;
741 : :
742 : : /* Loop until we find a valid subplan to execute. */
2322 743 [ + + ]: 232 : while (pstate->pa_finished[pstate->pa_next_plan])
744 : : {
745 : : int nextplan;
746 : :
2197 alvherre@alvh.no-ip. 747 : 116 : nextplan = bms_next_member(node->as_valid_subplans,
748 : : pstate->pa_next_plan);
749 [ + + ]: 116 : if (nextplan >= 0)
750 : : {
751 : : /* Advance to the next valid plan. */
752 : 81 : pstate->pa_next_plan = nextplan;
753 : : }
2189 754 [ + + ]: 35 : else if (node->as_whichplan > node->as_first_partial_plan)
755 : : {
756 : : /*
757 : : * Try looping back to the first valid partial plan, if there is
758 : : * one. If there isn't, arrange to bail out below.
759 : : */
2197 760 : 28 : nextplan = bms_next_member(node->as_valid_subplans,
2189 761 : 28 : node->as_first_partial_plan - 1);
2197 762 : 28 : pstate->pa_next_plan =
763 [ - + ]: 28 : nextplan < 0 ? node->as_whichplan : nextplan;
764 : : }
765 : : else
766 : : {
767 : : /*
768 : : * At last plan, and either there are no partial plans or we've
769 : : * tried them all. Arrange to bail out.
770 : : */
2322 rhaas@postgresql.org 771 : 7 : pstate->pa_next_plan = node->as_whichplan;
772 : : }
773 : :
774 [ + + ]: 116 : if (pstate->pa_next_plan == node->as_whichplan)
775 : : {
776 : : /* We've tried everything! */
777 : 18 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
778 : 18 : LWLockRelease(&pstate->pa_lock);
779 : 18 : return false;
780 : : }
781 : : }
782 : :
783 : : /* Pick the plan we found, and advance pa_next_plan one more time. */
2197 alvherre@alvh.no-ip. 784 : 116 : node->as_whichplan = pstate->pa_next_plan;
785 : 116 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
786 : : pstate->pa_next_plan);
787 : :
788 : : /*
789 : : * If there are no more valid plans then try setting the next plan to the
790 : : * first valid partial plan.
791 : : */
792 [ + + ]: 116 : if (pstate->pa_next_plan < 0)
793 : : {
794 : 19 : int nextplan = bms_next_member(node->as_valid_subplans,
2189 795 : 19 : node->as_first_partial_plan - 1);
796 : :
2197 797 [ + - ]: 19 : if (nextplan >= 0)
798 : 19 : pstate->pa_next_plan = nextplan;
799 : : else
800 : : {
801 : : /*
802 : : * There are no valid partial plans, and we already chose the last
803 : : * non-partial plan; so flag that there's nothing more for our
804 : : * fellow workers to do.
805 : : */
2321 rhaas@postgresql.org 806 :UBC 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
807 : : }
808 : : }
809 : :
810 : : /* If non-partial, immediately mark as finished. */
2189 alvherre@alvh.no-ip. 811 [ + + ]:CBC 116 : if (node->as_whichplan < node->as_first_partial_plan)
2322 rhaas@postgresql.org 812 : 24 : node->as_pstate->pa_finished[node->as_whichplan] = true;
813 : :
814 : 116 : LWLockRelease(&pstate->pa_lock);
815 : :
816 : 116 : return true;
817 : : }
818 : :
819 : : /*
820 : : * mark_invalid_subplans_as_finished
821 : : * Marks the ParallelAppendState's pa_finished as true for each invalid
822 : : * subplan.
823 : : *
824 : : * This function should only be called for parallel Append with run-time
825 : : * pruning enabled.
826 : : */
827 : : static void
2199 alvherre@alvh.no-ip. 828 : 24 : mark_invalid_subplans_as_finished(AppendState *node)
829 : : {
830 : : int i;
831 : :
832 : : /* Only valid to call this while in parallel Append mode */
833 [ - + ]: 24 : Assert(node->as_pstate);
834 : :
835 : : /* Shouldn't have been called when run-time pruning is not enabled */
836 [ - + ]: 24 : Assert(node->as_prune_state);
837 : :
838 : : /* Nothing to do if all plans are valid */
839 [ - + ]: 24 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
2199 alvherre@alvh.no-ip. 840 :UBC 0 : return;
841 : :
842 : : /* Mark all non-valid plans as finished */
2199 alvherre@alvh.no-ip. 843 [ + + ]:CBC 81 : for (i = 0; i < node->as_nplans; i++)
844 : : {
845 [ + + ]: 57 : if (!bms_is_member(i, node->as_valid_subplans))
846 : 24 : node->as_pstate->pa_finished[i] = true;
847 : : }
848 : : }
849 : :
850 : : /* ----------------------------------------------------------------
851 : : * Asynchronous Append Support
852 : : * ----------------------------------------------------------------
853 : : */
854 : :
855 : : /* ----------------------------------------------------------------
856 : : * ExecAppendAsyncBegin
857 : : *
858 : : * Begin executing designed async-capable subplans.
859 : : * ----------------------------------------------------------------
860 : : */
861 : : static void
1110 efujita@postgresql.o 862 : 37 : ExecAppendAsyncBegin(AppendState *node)
863 : : {
864 : : int i;
865 : :
866 : : /* Backward scan is not supported by async-aware Appends. */
867 [ - + ]: 37 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
868 : :
869 : : /* We should never be called when there are no subplans */
1042 870 [ - + ]: 37 : Assert(node->as_nplans > 0);
871 : :
872 : : /* We should never be called when there are no async subplans. */
1110 873 [ - + ]: 37 : Assert(node->as_nasyncplans > 0);
874 : :
875 : : /* If we've yet to determine the valid subplans then do so now. */
409 tgl@sss.pgh.pa.us 876 [ + + ]: 37 : if (!node->as_valid_subplans_identified)
877 : : {
1110 efujita@postgresql.o 878 : 2 : node->as_valid_subplans =
740 alvherre@alvh.no-ip. 879 : 2 : ExecFindMatchingSubPlans(node->as_prune_state, false);
409 tgl@sss.pgh.pa.us 880 : 2 : node->as_valid_subplans_identified = true;
881 : :
1042 efujita@postgresql.o 882 : 2 : classify_matching_subplans(node);
883 : : }
884 : :
885 : : /* Initialize state variables. */
886 : 37 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
887 : 37 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
888 : :
889 : : /* Nothing to do if there are no valid async subplans. */
1110 890 [ - + ]: 37 : if (node->as_nasyncremain == 0)
1110 efujita@postgresql.o 891 :UBC 0 : return;
892 : :
893 : : /* Make a request for each of the valid async subplans. */
1110 efujita@postgresql.o 894 :CBC 37 : i = -1;
895 [ + + ]: 109 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
896 : : {
897 : 72 : AsyncRequest *areq = node->as_asyncrequests[i];
898 : :
899 [ - + ]: 72 : Assert(areq->request_index == i);
900 [ - + ]: 72 : Assert(!areq->callback_pending);
901 : :
902 : : /* Do the actual work. */
903 : 72 : ExecAsyncRequest(areq);
904 : : }
905 : : }
906 : :
907 : : /* ----------------------------------------------------------------
908 : : * ExecAppendAsyncGetNext
909 : : *
910 : : * Get the next tuple from any of the asynchronous subplans.
911 : : * ----------------------------------------------------------------
912 : : */
913 : : static bool
914 : 6138 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
915 : : {
916 : 6138 : *result = NULL;
917 : :
918 : : /* We should never be called when there are no valid async subplans. */
919 [ - + ]: 6138 : Assert(node->as_nasyncremain > 0);
920 : :
921 : : /* Request a tuple asynchronously. */
922 [ + + ]: 6138 : if (ExecAppendAsyncRequest(node, result))
923 : 6021 : return true;
924 : :
925 [ + + ]: 174 : while (node->as_nasyncremain > 0)
926 : : {
927 [ - + ]: 143 : CHECK_FOR_INTERRUPTS();
928 : :
929 : : /* Wait or poll for async events. */
930 : 143 : ExecAppendAsyncEventWait(node);
931 : :
932 : : /* Request a tuple asynchronously. */
933 [ + + ]: 142 : if (ExecAppendAsyncRequest(node, result))
934 : 85 : return true;
935 : :
936 : : /* Break from loop if there's any sync subplan that isn't complete. */
937 [ - + ]: 57 : if (!node->as_syncdone)
1110 efujita@postgresql.o 938 :UBC 0 : break;
939 : : }
940 : :
941 : : /*
942 : : * If all sync subplans are complete, we're totally done scanning the
943 : : * given node. Otherwise, we're done with the asynchronous stuff but must
944 : : * continue scanning the sync subplans.
945 : : */
1110 efujita@postgresql.o 946 [ + - ]:CBC 31 : if (node->as_syncdone)
947 : : {
948 [ - + ]: 31 : Assert(node->as_nasyncremain == 0);
949 : 31 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
950 : 31 : return true;
951 : : }
952 : :
1110 efujita@postgresql.o 953 :UBC 0 : return false;
954 : : }
955 : :
956 : : /* ----------------------------------------------------------------
957 : : * ExecAppendAsyncRequest
958 : : *
959 : : * Request a tuple asynchronously.
960 : : * ----------------------------------------------------------------
961 : : */
962 : : static bool
1110 efujita@postgresql.o 963 :CBC 6280 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
964 : : {
965 : : Bitmapset *needrequest;
966 : : int i;
967 : :
968 : : /* Nothing to do if there are no async subplans needing a new request. */
969 [ + + ]: 6280 : if (bms_is_empty(node->as_needrequest))
970 : : {
1087 971 [ - + ]: 87 : Assert(node->as_nasyncresults == 0);
1110 972 : 87 : return false;
973 : : }
974 : :
975 : : /*
976 : : * If there are any asynchronously-generated results that have not yet
977 : : * been returned, we have nothing to do; just return one of them.
978 : : */
979 [ + + ]: 6193 : if (node->as_nasyncresults > 0)
980 : : {
981 : 290 : --node->as_nasyncresults;
982 : 290 : *result = node->as_asyncresults[node->as_nasyncresults];
983 : 290 : return true;
984 : : }
985 : :
986 : : /* Make a new request for each of the async subplans that need it. */
987 : 5903 : needrequest = node->as_needrequest;
988 : 5903 : node->as_needrequest = NULL;
989 : 5903 : i = -1;
990 [ + + ]: 12006 : while ((i = bms_next_member(needrequest, i)) >= 0)
991 : : {
992 : 6103 : AsyncRequest *areq = node->as_asyncrequests[i];
993 : :
994 : : /* Do the actual work. */
995 : 6103 : ExecAsyncRequest(areq);
996 : : }
997 : 5903 : bms_free(needrequest);
998 : :
999 : : /* Return one of the asynchronously-generated results if any. */
1000 [ + + ]: 5903 : if (node->as_nasyncresults > 0)
1001 : : {
1002 : 5816 : --node->as_nasyncresults;
1003 : 5816 : *result = node->as_asyncresults[node->as_nasyncresults];
1004 : 5816 : return true;
1005 : : }
1006 : :
1007 : 87 : return false;
1008 : : }
1009 : :
1010 : : /* ----------------------------------------------------------------
1011 : : * ExecAppendAsyncEventWait
1012 : : *
1013 : : * Wait or poll for file descriptor events and fire callbacks.
1014 : : * ----------------------------------------------------------------
1015 : : */
1016 : : static void
1017 : 160 : ExecAppendAsyncEventWait(AppendState *node)
1018 : : {
1087 1019 : 160 : int nevents = node->as_nasyncplans + 1;
1110 1020 [ + + ]: 160 : long timeout = node->as_syncdone ? -1 : 0;
1021 : : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1022 : : int noccurred;
1023 : : int i;
1024 : :
1025 : : /* We should never be called when there are no valid async subplans. */
1026 [ - + ]: 160 : Assert(node->as_nasyncremain > 0);
1027 : :
143 heikki.linnakangas@i 1028 [ - + ]: 160 : Assert(node->as_eventset == NULL);
143 heikki.linnakangas@i 1029 :GNC 160 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1110 efujita@postgresql.o 1030 : 160 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1031 : : NULL, NULL);
1032 : :
1033 : : /* Give each waiting subplan a chance to add an event. */
1034 : 160 : i = -1;
1035 [ + + ]: 490 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1036 : : {
1037 : 331 : AsyncRequest *areq = node->as_asyncrequests[i];
1038 : :
1039 [ + + ]: 331 : if (areq->callback_pending)
1040 : 283 : ExecAsyncConfigureWait(areq);
1041 : : }
1042 : :
1043 : : /*
1044 : : * No need for further processing if there are no configured events other
1045 : : * than the postmaster death event.
1046 : : */
989 1047 [ + + ]: 159 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1048 : : {
989 efujita@postgresql.o 1049 :CBC 1 : FreeWaitEventSet(node->as_eventset);
1050 : 1 : node->as_eventset = NULL;
989 efujita@postgresql.o 1051 :GNC 11 : return;
1052 : : }
1053 : :
1054 : : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1087 1055 [ - + ]: 158 : if (nevents > EVENT_BUFFER_SIZE)
1087 efujita@postgresql.o 1056 :UNC 0 : nevents = EVENT_BUFFER_SIZE;
1057 : :
1058 : : /*
1059 : : * If the timeout is -1, wait until at least one event occurs. If the
1060 : : * timeout is 0, poll for events, but do not wait at all.
1061 : : */
1110 efujita@postgresql.o 1062 :GNC 158 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1063 : : nevents, WAIT_EVENT_APPEND_READY);
1064 : 158 : FreeWaitEventSet(node->as_eventset);
1065 : 158 : node->as_eventset = NULL;
1110 efujita@postgresql.o 1066 [ + + ]:CBC 158 : if (noccurred == 0)
1067 : 10 : return;
1068 : :
1069 : : /* Deliver notifications. */
1070 [ + + ]: 296 : for (i = 0; i < noccurred; i++)
1071 : : {
1072 : 148 : WaitEvent *w = &occurred_event[i];
1073 : :
1074 : : /*
1075 : : * Each waiting subplan should have registered its wait event with
1076 : : * user_data pointing back to its AsyncRequest.
1077 : : */
1078 [ + - ]: 148 : if ((w->events & WL_SOCKET_READABLE) != 0)
1079 : : {
1080 : 148 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1081 : :
986 1082 [ + - ]: 148 : if (areq->callback_pending)
1083 : : {
1084 : : /*
1085 : : * Mark it as no longer needing a callback. We must do this
1086 : : * before dispatching the callback in case the callback resets
1087 : : * the flag.
1088 : : */
1089 : 148 : areq->callback_pending = false;
1090 : :
1091 : : /* Do the actual work. */
1092 : 148 : ExecAsyncNotify(areq);
1093 : : }
1094 : : }
1095 : : }
1096 : : }
1097 : :
1098 : : /* ----------------------------------------------------------------
1099 : : * ExecAsyncAppendResponse
1100 : : *
1101 : : * Receive a response from an asynchronous request we made.
1102 : : * ----------------------------------------------------------------
1103 : : */
1104 : : void
1110 1105 : 6328 : ExecAsyncAppendResponse(AsyncRequest *areq)
1106 : : {
1107 : 6328 : AppendState *node = (AppendState *) areq->requestor;
1108 : 6328 : TupleTableSlot *slot = areq->result;
1109 : :
1110 : : /* The result should be a TupleTableSlot or NULL. */
1111 [ + + - + ]: 6328 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1112 : :
1113 : : /* Nothing to do if the request is pending. */
1114 [ + + ]: 6328 : if (!areq->request_complete)
1115 : : {
1116 : : /* The request would have been pending for a callback. */
1117 [ - + ]: 161 : Assert(areq->callback_pending);
1118 : 161 : return;
1119 : : }
1120 : :
1121 : : /* If the result is NULL or an empty slot, there's nothing more to do. */
1122 [ + + - + ]: 6167 : if (TupIsNull(slot))
1123 : : {
1124 : : /* The ending subplan wouldn't have been pending for a callback. */
1125 [ - + ]: 60 : Assert(!areq->callback_pending);
1126 : 60 : --node->as_nasyncremain;
1127 : 60 : return;
1128 : : }
1129 : :
1130 : : /* Save result so we can return it. */
1131 [ - + ]: 6107 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1132 : 6107 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1133 : :
1134 : : /*
1135 : : * Mark the subplan that returned a result as ready for a new request. We
1136 : : * don't launch another one here immediately because it might complete.
1137 : : */
1138 : 6107 : node->as_needrequest = bms_add_member(node->as_needrequest,
1139 : : areq->request_index);
1140 : : }
1141 : :
1142 : : /* ----------------------------------------------------------------
1143 : : * classify_matching_subplans
1144 : : *
1145 : : * Classify the node's as_valid_subplans into sync ones and
1146 : : * async ones, adjust it to contain sync ones only, and save
1147 : : * async ones in the node's as_valid_asyncplans.
1148 : : * ----------------------------------------------------------------
1149 : : */
1150 : : static void
1151 : 46 : classify_matching_subplans(AppendState *node)
1152 : : {
1153 : : Bitmapset *valid_asyncplans;
1154 : :
409 tgl@sss.pgh.pa.us 1155 [ - + ]: 46 : Assert(node->as_valid_subplans_identified);
1110 efujita@postgresql.o 1156 [ - + ]: 46 : Assert(node->as_valid_asyncplans == NULL);
1157 : :
1158 : : /* Nothing to do if there are no valid subplans. */
1159 [ - + ]: 46 : if (bms_is_empty(node->as_valid_subplans))
1160 : : {
1110 efujita@postgresql.o 1161 :UBC 0 : node->as_syncdone = true;
1162 : 0 : node->as_nasyncremain = 0;
1163 : 0 : return;
1164 : : }
1165 : :
1166 : : /* Nothing to do if there are no valid async subplans. */
1110 efujita@postgresql.o 1167 [ - + ]:CBC 46 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1168 : : {
1110 efujita@postgresql.o 1169 :UBC 0 : node->as_nasyncremain = 0;
1170 : 0 : return;
1171 : : }
1172 : :
1173 : : /* Get valid async subplans. */
409 tgl@sss.pgh.pa.us 1174 :CBC 46 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1175 : 46 : node->as_valid_subplans);
1176 : :
1177 : : /* Adjust the valid subplans to contain sync subplans only. */
1110 efujita@postgresql.o 1178 : 46 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1179 : : valid_asyncplans);
1180 : :
1181 : : /* Save valid async subplans. */
1182 : 46 : node->as_valid_asyncplans = valid_asyncplans;
1183 : : }
|