TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/execdebug.h"
42 : #include "executor/execPartition.h"
43 : #include "executor/nodeMergeAppend.h"
44 : #include "lib/binaryheap.h"
45 : #include "miscadmin.h"
46 :
47 : /*
48 : * We have one slot for each item in the heap array. We use SlotNumber
49 : * to store slot indexes. This doesn't actually provide any formal
50 : * type-safety, but it makes the code more self-documenting.
51 : */
52 : typedef int32 SlotNumber;
53 :
54 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 : static int heap_compare_slots(Datum a, Datum b, void *arg);
56 :
57 :
58 : /* ----------------------------------------------------------------
59 : * ExecInitMergeAppend
60 : *
61 : * Begin all of the subscans of the MergeAppend node.
62 : * ----------------------------------------------------------------
63 : */
64 : MergeAppendState *
65 CBC 207 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 : {
67 207 : MergeAppendState *mergestate = makeNode(MergeAppendState);
68 : PlanState **mergeplanstates;
69 : Bitmapset *validsubplans;
70 : int nplans;
71 : int i,
72 : j;
73 :
74 : /* check for unsupported flags */
75 207 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 :
77 : /*
78 : * create new MergeAppendState for our node
79 : */
80 207 : mergestate->ps.plan = (Plan *) node;
81 207 : mergestate->ps.state = estate;
82 207 : mergestate->ps.ExecProcNode = ExecMergeAppend;
83 :
84 : /* If run-time partition pruning is enabled, then set that up now */
85 GNC 207 : if (node->part_prune_index >= 0)
86 : {
87 : PartitionPruneState *prunestate;
88 :
89 : /*
90 : * Set up pruning data structure. This also initializes the set of
91 : * subplans to initialize (validsubplans) by taking into account the
92 : * result of performing initial pruning if any.
93 : */
94 CBC 27 : prunestate = ExecInitPartitionPruning(&mergestate->ps,
95 27 : list_length(node->mergeplans),
96 : node->part_prune_index,
97 : node->apprelids,
98 : &validsubplans);
99 GIC 27 : mergestate->ms_prune_state = prunestate;
100 CBC 27 : nplans = bms_num_members(validsubplans);
101 ECB :
102 : /*
103 : * When no run-time pruning is required and there's at least one
104 : * subplan, we can fill ms_valid_subplans immediately, preventing
105 : * later calls to ExecFindMatchingSubPlans.
106 : */
107 GIC 27 : if (!prunestate->do_exec_prune && nplans > 0)
108 CBC 12 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
109 ECB : }
110 : else
111 : {
112 GIC 180 : nplans = list_length(node->mergeplans);
113 ECB :
114 : /*
115 : * When run-time partition pruning is not enabled we can just mark all
116 : * subplans as valid; they must also all be initialized.
117 : */
118 GIC 180 : Assert(nplans > 0);
119 CBC 180 : mergestate->ms_valid_subplans = validsubplans =
120 180 : bms_add_range(NULL, 0, nplans - 1);
121 180 : mergestate->ms_prune_state = NULL;
122 ECB : }
123 :
124 GIC 207 : mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
125 CBC 207 : mergestate->mergeplans = mergeplanstates;
126 207 : mergestate->ms_nplans = nplans;
127 ECB :
128 GIC 207 : mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
129 CBC 207 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
130 ECB : mergestate);
131 :
132 : /*
133 : * Miscellaneous initialization
134 : *
135 : * MergeAppend nodes do have Result slots, which hold pointers to tuples,
136 : * so we have to initialize them. FIXME
137 : */
138 GIC 207 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
139 ECB :
140 : /* node returns slots from each of its subnodes, therefore not fixed */
141 GIC 207 : mergestate->ps.resultopsset = true;
142 CBC 207 : mergestate->ps.resultopsfixed = false;
143 ECB :
144 : /*
145 : * call ExecInitNode on each of the valid plans to be executed and save
146 : * the results into the mergeplanstates array.
147 : */
148 GIC 207 : j = 0;
149 CBC 207 : i = -1;
150 796 : while ((i = bms_next_member(validsubplans, i)) >= 0)
151 ECB : {
152 GIC 589 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
153 ECB :
154 GIC 589 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
155 ECB : }
156 :
157 GIC 207 : mergestate->ps.ps_ProjInfo = NULL;
158 ECB :
159 : /*
160 : * initialize sort-key information
161 : */
162 GIC 207 : mergestate->ms_nkeys = node->numCols;
163 CBC 207 : mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
164 ECB :
165 GIC 444 : for (i = 0; i < node->numCols; i++)
166 ECB : {
167 GIC 237 : SortSupport sortKey = mergestate->ms_sortkeys + i;
168 ECB :
169 GIC 237 : sortKey->ssup_cxt = CurrentMemoryContext;
170 CBC 237 : sortKey->ssup_collation = node->collations[i];
171 237 : sortKey->ssup_nulls_first = node->nullsFirst[i];
172 237 : sortKey->ssup_attno = node->sortColIdx[i];
173 ECB :
174 : /*
175 : * It isn't feasible to perform abbreviated key conversion, since
176 : * tuples are pulled into mergestate's binary heap as needed. It
177 : * would likely be counter-productive to convert tuples into an
178 : * abbreviated representation as they're pulled up, so opt out of that
179 : * additional optimization entirely.
180 : */
181 GIC 237 : sortKey->abbreviate = false;
182 ECB :
183 GIC 237 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
184 ECB : }
185 :
186 : /*
187 : * initialize to show we have not run the subplans yet
188 : */
189 GIC 207 : mergestate->ms_initialized = false;
190 ECB :
191 GIC 207 : return mergestate;
192 ECB : }
193 :
194 : /* ----------------------------------------------------------------
195 : * ExecMergeAppend
196 : *
197 : * Handles iteration over multiple subplans.
198 : * ----------------------------------------------------------------
199 : */
200 : static TupleTableSlot *
201 GIC 21781 : ExecMergeAppend(PlanState *pstate)
202 ECB : {
203 GIC 21781 : MergeAppendState *node = castNode(MergeAppendState, pstate);
204 ECB : TupleTableSlot *result;
205 : SlotNumber i;
206 :
207 GIC 21781 : CHECK_FOR_INTERRUPTS();
208 ECB :
209 GIC 21781 : if (!node->ms_initialized)
210 ECB : {
211 : /* Nothing to do if all subplans were pruned */
212 GIC 100 : if (node->ms_nplans == 0)
213 CBC 9 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
214 ECB :
215 : /*
216 : * If we've yet to determine the valid subplans then do so now. If
217 : * run-time pruning is disabled then the valid subplans will always be
218 : * set to all subplans.
219 : */
220 GIC 91 : if (node->ms_valid_subplans == NULL)
221 CBC 6 : node->ms_valid_subplans =
222 6 : ExecFindMatchingSubPlans(node->ms_prune_state, false);
223 ECB :
224 : /*
225 : * First time through: pull the first tuple from each valid subplan,
226 : * and set up the heap.
227 : */
228 GIC 91 : i = -1;
229 CBC 349 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
230 ECB : {
231 GIC 258 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
232 CBC 258 : if (!TupIsNull(node->ms_slots[i]))
233 220 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
234 ECB : }
235 GIC 91 : binaryheap_build(node->ms_heap);
236 CBC 91 : node->ms_initialized = true;
237 ECB : }
238 : else
239 : {
240 : /*
241 : * Otherwise, pull the next tuple from whichever subplan we returned
242 : * from last time, and reinsert the subplan index into the heap,
243 : * because it might now compare differently against the existing
244 : * elements of the heap. (We could perhaps simplify the logic a bit
245 : * by doing this before returning from the prior call, but it's better
246 : * to not pull tuples until necessary.)
247 : */
248 GIC 21681 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
249 CBC 21681 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
250 21681 : if (!TupIsNull(node->ms_slots[i]))
251 21548 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
252 ECB : else
253 GIC 133 : (void) binaryheap_remove_first(node->ms_heap);
254 ECB : }
255 :
256 GIC 21772 : if (binaryheap_empty(node->ms_heap))
257 ECB : {
258 : /* All the subplans are exhausted, and so is the heap */
259 GIC 64 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
260 ECB : }
261 : else
262 : {
263 GIC 21708 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
264 CBC 21708 : result = node->ms_slots[i];
265 ECB : }
266 :
267 GIC 21772 : return result;
268 ECB : }
269 :
270 : /*
271 : * Compare the tuples in the two given slots.
272 : */
273 : static int32
274 GIC 29704 : heap_compare_slots(Datum a, Datum b, void *arg)
275 ECB : {
276 GIC 29704 : MergeAppendState *node = (MergeAppendState *) arg;
277 CBC 29704 : SlotNumber slot1 = DatumGetInt32(a);
278 29704 : SlotNumber slot2 = DatumGetInt32(b);
279 ECB :
280 GIC 29704 : TupleTableSlot *s1 = node->ms_slots[slot1];
281 CBC 29704 : TupleTableSlot *s2 = node->ms_slots[slot2];
282 ECB : int nkey;
283 :
284 GIC 29704 : Assert(!TupIsNull(s1));
285 CBC 29704 : Assert(!TupIsNull(s2));
286 ECB :
287 GIC 31627 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
288 ECB : {
289 GIC 29704 : SortSupport sortKey = node->ms_sortkeys + nkey;
290 CBC 29704 : AttrNumber attno = sortKey->ssup_attno;
291 ECB : Datum datum1,
292 : datum2;
293 : bool isNull1,
294 : isNull2;
295 : int compare;
296 :
297 GIC 29704 : datum1 = slot_getattr(s1, attno, &isNull1);
298 CBC 29704 : datum2 = slot_getattr(s2, attno, &isNull2);
299 ECB :
300 GIC 29704 : compare = ApplySortComparator(datum1, isNull1,
301 ECB : datum2, isNull2,
302 : sortKey);
303 GIC 29704 : if (compare != 0)
304 ECB : {
305 GIC 27781 : INVERT_COMPARE_RESULT(compare);
306 CBC 27781 : return compare;
307 ECB : }
308 : }
309 GIC 1923 : return 0;
310 ECB : }
311 :
312 : /* ----------------------------------------------------------------
313 : * ExecEndMergeAppend
314 : *
315 : * Shuts down the subscans of the MergeAppend node.
316 : *
317 : * Returns nothing of interest.
318 : * ----------------------------------------------------------------
319 : */
320 : void
321 GIC 207 : ExecEndMergeAppend(MergeAppendState *node)
322 ECB : {
323 : PlanState **mergeplans;
324 : int nplans;
325 : int i;
326 :
327 : /*
328 : * get information from the node
329 : */
330 GIC 207 : mergeplans = node->mergeplans;
331 CBC 207 : nplans = node->ms_nplans;
332 ECB :
333 : /*
334 : * shut down each of the subscans
335 : */
336 GIC 796 : for (i = 0; i < nplans; i++)
337 CBC 589 : ExecEndNode(mergeplans[i]);
338 207 : }
339 ECB :
340 : void
341 GIC 9 : ExecReScanMergeAppend(MergeAppendState *node)
342 ECB : {
343 : int i;
344 :
345 : /*
346 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
347 : * we'd better unset the valid subplans so that they are reselected for
348 : * the new parameter values.
349 : */
350 GIC 9 : if (node->ms_prune_state &&
351 LBC 0 : bms_overlap(node->ps.chgParam,
352 UBC 0 : node->ms_prune_state->execparamids))
353 EUB : {
354 UIC 0 : bms_free(node->ms_valid_subplans);
355 UBC 0 : node->ms_valid_subplans = NULL;
356 EUB : }
357 :
358 GIC 27 : for (i = 0; i < node->ms_nplans; i++)
359 ECB : {
360 GIC 18 : PlanState *subnode = node->mergeplans[i];
361 ECB :
362 : /*
363 : * ExecReScan doesn't know about my subplans, so I have to do
364 : * changed-parameter signaling myself.
365 : */
366 GIC 18 : if (node->ps.chgParam != NULL)
367 CBC 18 : UpdateChangedParamSet(subnode, node->ps.chgParam);
368 ECB :
369 : /*
370 : * If chgParam of subnode is not null then plan will be re-scanned by
371 : * first ExecProcNode.
372 : */
373 GIC 18 : if (subnode->chgParam == NULL)
374 LBC 0 : ExecReScan(subnode);
375 EUB : }
376 GIC 9 : binaryheap_reset(node->ms_heap);
377 CBC 9 : node->ms_initialized = false;
378 9 : }
|