Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/execdebug.h"
42 : #include "executor/execPartition.h"
43 : #include "executor/nodeMergeAppend.h"
44 : #include "lib/binaryheap.h"
45 : #include "miscadmin.h"
46 :
47 : /*
48 : * We have one slot for each item in the heap array. We use SlotNumber
49 : * to store slot indexes. This doesn't actually provide any formal
50 : * type-safety, but it makes the code more self-documenting.
51 : */
52 : typedef int32 SlotNumber;
53 :
54 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 : static int heap_compare_slots(Datum a, Datum b, void *arg);
56 :
57 :
58 : /* ----------------------------------------------------------------
59 : * ExecInitMergeAppend
60 : *
61 : * Begin all of the subscans of the MergeAppend node.
62 : * ----------------------------------------------------------------
63 : */
64 : MergeAppendState *
4560 tgl 65 CBC 207 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 : {
67 207 : MergeAppendState *mergestate = makeNode(MergeAppendState);
68 : PlanState **mergeplanstates;
69 : Bitmapset *validsubplans;
70 : int nplans;
71 : int i,
72 : j;
73 :
74 : /* check for unsupported flags */
75 207 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 :
77 : /*
78 : * create new MergeAppendState for our node
79 : */
80 207 : mergestate->ps.plan = (Plan *) node;
81 207 : mergestate->ps.state = estate;
2092 andres 82 207 : mergestate->ps.ExecProcNode = ExecMergeAppend;
83 :
84 : /* If run-time partition pruning is enabled, then set that up now */
129 alvherre 85 GNC 207 : if (node->part_prune_index >= 0)
86 : {
87 : PartitionPruneState *prunestate;
88 :
89 : /*
90 : * Set up pruning data structure. This also initializes the set of
91 : * subplans to initialize (validsubplans) by taking into account the
92 : * result of performing initial pruning if any.
93 : */
369 alvherre 94 CBC 27 : prunestate = ExecInitPartitionPruning(&mergestate->ps,
95 27 : list_length(node->mergeplans),
96 : node->part_prune_index,
97 : node->apprelids,
98 : &validsubplans);
1725 heikki.linnakangas 99 GIC 27 : mergestate->ms_prune_state = prunestate;
369 alvherre 100 CBC 27 : nplans = bms_num_members(validsubplans);
1725 heikki.linnakangas 101 ECB :
102 : /*
103 : * When no run-time pruning is required and there's at least one
104 : * subplan, we can fill ms_valid_subplans immediately, preventing
105 : * later calls to ExecFindMatchingSubPlans.
106 : */
1215 tgl 107 GIC 27 : if (!prunestate->do_exec_prune && nplans > 0)
1725 heikki.linnakangas 108 CBC 12 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
1725 heikki.linnakangas 109 ECB : }
110 : else
111 : {
1725 heikki.linnakangas 112 GIC 180 : nplans = list_length(node->mergeplans);
1725 heikki.linnakangas 113 ECB :
114 : /*
115 : * When run-time partition pruning is not enabled we can just mark all
116 : * subplans as valid; they must also all be initialized.
117 : */
1714 alvherre 118 GIC 180 : Assert(nplans > 0);
1725 heikki.linnakangas 119 CBC 180 : mergestate->ms_valid_subplans = validsubplans =
120 180 : bms_add_range(NULL, 0, nplans - 1);
121 180 : mergestate->ms_prune_state = NULL;
1725 heikki.linnakangas 122 ECB : }
123 :
1725 heikki.linnakangas 124 GIC 207 : mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
4560 tgl 125 CBC 207 : mergestate->mergeplans = mergeplanstates;
126 207 : mergestate->ms_nplans = nplans;
4560 tgl 127 ECB :
4560 tgl 128 GIC 207 : mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
3783 rhaas 129 CBC 207 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
3783 rhaas 130 ECB : mergestate);
131 :
132 : /*
133 : * Miscellaneous initialization
134 : *
135 : * MergeAppend nodes do have Result slots, which hold pointers to tuples,
136 : * so we have to initialize them. FIXME
137 : */
1606 andres 138 GIC 207 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
1606 andres 139 ECB :
140 : /* node returns slots from each of its subnodes, therefore not fixed */
1606 andres 141 GIC 207 : mergestate->ps.resultopsset = true;
1606 andres 142 CBC 207 : mergestate->ps.resultopsfixed = false;
4560 tgl 143 ECB :
144 : /*
145 : * call ExecInitNode on each of the valid plans to be executed and save
146 : * the results into the mergeplanstates array.
147 : */
1357 drowley 148 GIC 207 : j = 0;
1357 drowley 149 CBC 207 : i = -1;
150 796 : while ((i = bms_next_member(validsubplans, i)) >= 0)
4560 tgl 151 ECB : {
1357 drowley 152 GIC 589 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
4560 tgl 153 ECB :
1357 drowley 154 GIC 589 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
4560 tgl 155 ECB : }
156 :
4560 tgl 157 GIC 207 : mergestate->ps.ps_ProjInfo = NULL;
4560 tgl 158 ECB :
159 : /*
160 : * initialize sort-key information
161 : */
4560 tgl 162 GIC 207 : mergestate->ms_nkeys = node->numCols;
4141 tgl 163 CBC 207 : mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
4560 tgl 164 ECB :
4560 tgl 165 GIC 444 : for (i = 0; i < node->numCols; i++)
4560 tgl 166 ECB : {
3955 bruce 167 GIC 237 : SortSupport sortKey = mergestate->ms_sortkeys + i;
4397 tgl 168 ECB :
4141 tgl 169 GIC 237 : sortKey->ssup_cxt = CurrentMemoryContext;
4141 tgl 170 CBC 237 : sortKey->ssup_collation = node->collations[i];
171 237 : sortKey->ssup_nulls_first = node->nullsFirst[i];
172 237 : sortKey->ssup_attno = node->sortColIdx[i];
4141 tgl 173 ECB :
174 : /*
175 : * It isn't feasible to perform abbreviated key conversion, since
176 : * tuples are pulled into mergestate's binary heap as needed. It
177 : * would likely be counter-productive to convert tuples into an
178 : * abbreviated representation as they're pulled up, so opt out of that
179 : * additional optimization entirely.
180 : */
3002 rhaas 181 GIC 237 : sortKey->abbreviate = false;
3002 rhaas 182 ECB :
4141 tgl 183 GIC 237 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
4560 tgl 184 ECB : }
185 :
186 : /*
187 : * initialize to show we have not run the subplans yet
188 : */
4560 tgl 189 GIC 207 : mergestate->ms_initialized = false;
4560 tgl 190 ECB :
4560 tgl 191 GIC 207 : return mergestate;
4560 tgl 192 ECB : }
193 :
194 : /* ----------------------------------------------------------------
195 : * ExecMergeAppend
196 : *
197 : * Handles iteration over multiple subplans.
198 : * ----------------------------------------------------------------
199 : */
200 : static TupleTableSlot *
2092 andres 201 GIC 21781 : ExecMergeAppend(PlanState *pstate)
4560 tgl 202 ECB : {
2092 andres 203 GIC 21781 : MergeAppendState *node = castNode(MergeAppendState, pstate);
4560 tgl 204 ECB : TupleTableSlot *result;
205 : SlotNumber i;
206 :
2084 andres 207 GIC 21781 : CHECK_FOR_INTERRUPTS();
2084 andres 208 ECB :
4560 tgl 209 GIC 21781 : if (!node->ms_initialized)
4560 tgl 210 ECB : {
211 : /* Nothing to do if all subplans were pruned */
1215 tgl 212 GIC 100 : if (node->ms_nplans == 0)
1725 heikki.linnakangas 213 CBC 9 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
1725 heikki.linnakangas 214 ECB :
215 : /*
216 : * If we've yet to determine the valid subplans then do so now. If
217 : * run-time pruning is disabled then the valid subplans will always be
218 : * set to all subplans.
219 : */
1725 heikki.linnakangas 220 GIC 91 : if (node->ms_valid_subplans == NULL)
1725 heikki.linnakangas 221 CBC 6 : node->ms_valid_subplans =
369 alvherre 222 6 : ExecFindMatchingSubPlans(node->ms_prune_state, false);
1725 heikki.linnakangas 223 ECB :
224 : /*
225 : * First time through: pull the first tuple from each valid subplan,
226 : * and set up the heap.
227 : */
1725 heikki.linnakangas 228 GIC 91 : i = -1;
1725 heikki.linnakangas 229 CBC 349 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
4560 tgl 230 ECB : {
4560 tgl 231 GIC 258 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
4560 tgl 232 CBC 258 : if (!TupIsNull(node->ms_slots[i]))
3783 rhaas 233 220 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
4560 tgl 234 ECB : }
3783 rhaas 235 GIC 91 : binaryheap_build(node->ms_heap);
4560 tgl 236 CBC 91 : node->ms_initialized = true;
4560 tgl 237 ECB : }
238 : else
239 : {
240 : /*
241 : * Otherwise, pull the next tuple from whichever subplan we returned
242 : * from last time, and reinsert the subplan index into the heap,
243 : * because it might now compare differently against the existing
244 : * elements of the heap. (We could perhaps simplify the logic a bit
245 : * by doing this before returning from the prior call, but it's better
246 : * to not pull tuples until necessary.)
247 : */
3783 rhaas 248 GIC 21681 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
4560 tgl 249 CBC 21681 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
250 21681 : if (!TupIsNull(node->ms_slots[i]))
3783 rhaas 251 21548 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
3783 rhaas 252 ECB : else
3783 rhaas 253 GIC 133 : (void) binaryheap_remove_first(node->ms_heap);
4560 tgl 254 ECB : }
255 :
3783 rhaas 256 GIC 21772 : if (binaryheap_empty(node->ms_heap))
4560 tgl 257 ECB : {
258 : /* All the subplans are exhausted, and so is the heap */
4560 tgl 259 GIC 64 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
4560 tgl 260 ECB : }
261 : else
262 : {
3783 rhaas 263 GIC 21708 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
3783 rhaas 264 CBC 21708 : result = node->ms_slots[i];
4560 tgl 265 ECB : }
266 :
3783 rhaas 267 GIC 21772 : return result;
4560 tgl 268 ECB : }
269 :
270 : /*
271 : * Compare the tuples in the two given slots.
272 : */
273 : static int32
3783 rhaas 274 GIC 29704 : heap_compare_slots(Datum a, Datum b, void *arg)
4560 tgl 275 ECB : {
3783 rhaas 276 GIC 29704 : MergeAppendState *node = (MergeAppendState *) arg;
3783 rhaas 277 CBC 29704 : SlotNumber slot1 = DatumGetInt32(a);
278 29704 : SlotNumber slot2 = DatumGetInt32(b);
3783 rhaas 279 ECB :
4560 tgl 280 GIC 29704 : TupleTableSlot *s1 = node->ms_slots[slot1];
4560 tgl 281 CBC 29704 : TupleTableSlot *s2 = node->ms_slots[slot2];
4560 tgl 282 ECB : int nkey;
283 :
4560 tgl 284 GIC 29704 : Assert(!TupIsNull(s1));
4560 tgl 285 CBC 29704 : Assert(!TupIsNull(s2));
4560 tgl 286 ECB :
4560 tgl 287 GIC 31627 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
4560 tgl 288 ECB : {
3955 bruce 289 GIC 29704 : SortSupport sortKey = node->ms_sortkeys + nkey;
4141 tgl 290 CBC 29704 : AttrNumber attno = sortKey->ssup_attno;
4382 bruce 291 ECB : Datum datum1,
292 : datum2;
293 : bool isNull1,
294 : isNull2;
295 : int compare;
296 :
4560 tgl 297 GIC 29704 : datum1 = slot_getattr(s1, attno, &isNull1);
4560 tgl 298 CBC 29704 : datum2 = slot_getattr(s2, attno, &isNull2);
4560 tgl 299 ECB :
4141 tgl 300 GIC 29704 : compare = ApplySortComparator(datum1, isNull1,
4141 tgl 301 ECB : datum2, isNull2,
302 : sortKey);
4141 tgl 303 GIC 29704 : if (compare != 0)
1647 tgl 304 ECB : {
1647 tgl 305 GIC 27781 : INVERT_COMPARE_RESULT(compare);
1647 tgl 306 CBC 27781 : return compare;
1647 tgl 307 ECB : }
308 : }
4560 tgl 309 GIC 1923 : return 0;
4560 tgl 310 ECB : }
311 :
312 : /* ----------------------------------------------------------------
313 : * ExecEndMergeAppend
314 : *
315 : * Shuts down the subscans of the MergeAppend node.
316 : *
317 : * Returns nothing of interest.
318 : * ----------------------------------------------------------------
319 : */
320 : void
4560 tgl 321 GIC 207 : ExecEndMergeAppend(MergeAppendState *node)
4560 tgl 322 ECB : {
323 : PlanState **mergeplans;
324 : int nplans;
325 : int i;
326 :
327 : /*
328 : * get information from the node
329 : */
4560 tgl 330 GIC 207 : mergeplans = node->mergeplans;
4560 tgl 331 CBC 207 : nplans = node->ms_nplans;
4560 tgl 332 ECB :
333 : /*
334 : * shut down each of the subscans
335 : */
4560 tgl 336 GIC 796 : for (i = 0; i < nplans; i++)
4560 tgl 337 CBC 589 : ExecEndNode(mergeplans[i]);
338 207 : }
4560 tgl 339 ECB :
340 : void
4560 tgl 341 GIC 9 : ExecReScanMergeAppend(MergeAppendState *node)
4560 tgl 342 ECB : {
343 : int i;
344 :
345 : /*
346 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
347 : * we'd better unset the valid subplans so that they are reselected for
348 : * the new parameter values.
349 : */
1725 heikki.linnakangas 350 GIC 9 : if (node->ms_prune_state &&
1725 heikki.linnakangas 351 LBC 0 : bms_overlap(node->ps.chgParam,
1725 heikki.linnakangas 352 UBC 0 : node->ms_prune_state->execparamids))
1725 heikki.linnakangas 353 EUB : {
1725 heikki.linnakangas 354 UIC 0 : bms_free(node->ms_valid_subplans);
1725 heikki.linnakangas 355 UBC 0 : node->ms_valid_subplans = NULL;
1725 heikki.linnakangas 356 EUB : }
357 :
4560 tgl 358 GIC 27 : for (i = 0; i < node->ms_nplans; i++)
4560 tgl 359 ECB : {
4560 tgl 360 GIC 18 : PlanState *subnode = node->mergeplans[i];
4560 tgl 361 ECB :
362 : /*
363 : * ExecReScan doesn't know about my subplans, so I have to do
364 : * changed-parameter signaling myself.
365 : */
4560 tgl 366 GIC 18 : if (node->ps.chgParam != NULL)
4560 tgl 367 CBC 18 : UpdateChangedParamSet(subnode, node->ps.chgParam);
4560 tgl 368 ECB :
369 : /*
370 : * If chgParam of subnode is not null then plan will be re-scanned by
371 : * first ExecProcNode.
372 : */
4560 tgl 373 GIC 18 : if (subnode->chgParam == NULL)
4560 tgl 374 LBC 0 : ExecReScan(subnode);
4560 tgl 375 EUB : }
3509 tgl 376 GIC 9 : binaryheap_reset(node->ms_heap);
4560 tgl 377 CBC 9 : node->ms_initialized = false;
378 9 : }
|