Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeIncrementalSort.c
4 : * Routines to handle incremental sorting of relations.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/nodeIncrementalSort.c
11 : *
12 : * DESCRIPTION
13 : *
14 : * Incremental sort is an optimized variant of multikey sort for cases
15 : * when the input is already sorted by a prefix of the sort keys. For
16 : * example when a sort by (key1, key2 ... keyN) is requested, and the
17 : * input is already sorted by (key1, key2 ... keyM), M < N, we can
18 : * divide the input into groups where keys (key1, ... keyM) are equal,
19 : * and only sort on the remaining columns.
20 : *
21 : * Consider the following example. We have input tuples consisting of
22 : * two integers (X, Y) already presorted by X, while it's required to
23 : * sort them by both X and Y. Let input tuples be following.
24 : *
25 : * (1, 5)
26 : * (1, 2)
27 : * (2, 9)
28 : * (2, 1)
29 : * (2, 5)
30 : * (3, 3)
31 : * (3, 7)
32 : *
33 : * An incremental sort algorithm would split the input into the following
34 : * groups, which have equal X, and then sort them by Y individually:
35 : *
36 : * (1, 5) (1, 2)
37 : * (2, 9) (2, 1) (2, 5)
38 : * (3, 3) (3, 7)
39 : *
40 : * After sorting these groups and putting them altogether, we would get
41 : * the following result which is sorted by X and Y, as requested:
42 : *
43 : * (1, 2)
44 : * (1, 5)
45 : * (2, 1)
46 : * (2, 5)
47 : * (2, 9)
48 : * (3, 3)
49 : * (3, 7)
50 : *
51 : * Incremental sort may be more efficient than plain sort, particularly
52 : * on large datasets, as it reduces the amount of data to sort at once,
53 : * making it more likely it fits into work_mem (eliminating the need to
54 : * spill to disk). But the main advantage of incremental sort is that
55 : * it can start producing rows early, before sorting the whole dataset,
56 : * which is a significant benefit especially for queries with LIMIT.
57 : *
58 : * The algorithm we've implemented here is modified from the theoretical
59 : * base described above by operating in two different modes:
60 : * - Fetching a minimum number of tuples without checking prefix key
61 : * group membership and sorting on all columns when safe.
62 : * - Fetching all tuples for a single prefix key group and sorting on
63 : * solely the unsorted columns.
64 : * We always begin in the first mode, and employ a heuristic to switch
65 : * into the second mode if we believe it's beneficial.
66 : *
67 : * Sorting incrementally can potentially use less memory, avoid fetching
68 : * and sorting all tuples in the dataset, and begin returning tuples before
69 : * the entire result set is available.
70 : *
71 : * The hybrid mode approach allows us to optimize for both very small
72 : * groups (where the overhead of a new tuplesort is high) and very large
73 : * groups (where we can lower cost by not having to sort on already sorted
74 : * columns), albeit at some extra cost while switching between modes.
75 : *
76 : *-------------------------------------------------------------------------
77 : */
78 :
79 : #include "postgres.h"
80 :
81 : #include "access/htup_details.h"
82 : #include "executor/execdebug.h"
83 : #include "executor/nodeIncrementalSort.h"
84 : #include "miscadmin.h"
85 : #include "utils/lsyscache.h"
86 : #include "utils/tuplesort.h"
87 :
88 : /*
89 : * We need to store the instrumentation information in either local node's sort
90 : * info or, for a parallel worker process, in the shared info (this avoids
91 : * having to additionally memcpy the info from local memory to shared memory
92 : * at each instrumentation call). This macro expands to choose the proper sort
93 : * state and group info.
94 : *
95 : * Arguments:
96 : * - node: type IncrementalSortState *
97 : * - groupName: the token fullsort or prefixsort
98 : */
99 : #define INSTRUMENT_SORT_GROUP(node, groupName) \
100 : do { \
101 : if ((node)->ss.ps.instrument != NULL) \
102 : { \
103 : if ((node)->shared_info && (node)->am_worker) \
104 : { \
105 : Assert(IsParallelWorker()); \
106 : Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
107 : instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
108 : (node)->groupName##_state); \
109 : } \
110 : else \
111 : { \
112 : instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
113 : (node)->groupName##_state); \
114 : } \
115 : } \
116 : } while (0)
117 :
118 :
119 : /* ----------------------------------------------------------------
120 : * instrumentSortedGroup
121 : *
122 : * Because incremental sort processes (potentially many) sort batches, we need
123 : * to capture tuplesort stats each time we finalize a sort state. This summary
124 : * data is later used for EXPLAIN ANALYZE output.
125 : * ----------------------------------------------------------------
126 : */
127 : static void
1098 tomas.vondra 128 CBC 72 : instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo,
129 : Tuplesortstate *sortState)
130 : {
131 : TuplesortInstrumentation sort_instr;
132 :
133 72 : groupInfo->groupCount++;
134 :
135 72 : tuplesort_get_stats(sortState, &sort_instr);
136 :
137 : /* Calculate total and maximum memory and disk space used. */
138 72 : switch (sort_instr.spaceType)
139 : {
1098 tomas.vondra 140 UBC 0 : case SORT_SPACE_TYPE_DISK:
141 0 : groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
142 0 : if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
143 0 : groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
144 :
145 0 : break;
1098 tomas.vondra 146 CBC 72 : case SORT_SPACE_TYPE_MEMORY:
147 72 : groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
148 72 : if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
149 27 : groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
150 :
151 72 : break;
152 : }
153 :
154 : /* Track each sort method we've used. */
155 72 : groupInfo->sortMethods |= sort_instr.sortMethod;
156 72 : }
157 :
158 : /* ----------------------------------------------------------------
159 : * preparePresortedCols
160 : *
161 : * Prepare information for presorted_keys comparisons.
162 : * ----------------------------------------------------------------
163 : */
164 : static void
165 188 : preparePresortedCols(IncrementalSortState *node)
166 : {
167 188 : IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
168 :
169 188 : node->presorted_keys =
170 188 : (PresortedKeyData *) palloc(plannode->nPresortedCols *
171 : sizeof(PresortedKeyData));
172 :
173 : /* Pre-cache comparison functions for each pre-sorted key. */
174 376 : for (int i = 0; i < plannode->nPresortedCols; i++)
175 : {
176 : Oid equalityOp,
177 : equalityFunc;
178 : PresortedKeyData *key;
179 :
180 188 : key = &node->presorted_keys[i];
181 188 : key->attno = plannode->sort.sortColIdx[i];
182 :
183 188 : equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
184 : NULL);
185 188 : if (!OidIsValid(equalityOp))
1098 tomas.vondra 186 UBC 0 : elog(ERROR, "missing equality operator for ordering operator %u",
187 : plannode->sort.sortOperators[i]);
188 :
1098 tomas.vondra 189 CBC 188 : equalityFunc = get_opcode(equalityOp);
190 188 : if (!OidIsValid(equalityFunc))
1098 tomas.vondra 191 UBC 0 : elog(ERROR, "missing function for operator %u", equalityOp);
192 :
193 : /* Lookup the comparison function */
1098 tomas.vondra 194 CBC 188 : fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
195 :
196 : /* We can initialize the callinfo just once and re-use it */
197 188 : key->fcinfo = palloc0(SizeForFunctionCallInfo(2));
198 188 : InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
199 : plannode->sort.collations[i], NULL, NULL);
200 188 : key->fcinfo->args[0].isnull = false;
201 188 : key->fcinfo->args[1].isnull = false;
202 : }
203 188 : }
204 :
205 : /* ----------------------------------------------------------------
206 : * isCurrentGroup
207 : *
208 : * Check whether a given tuple belongs to the current sort group by comparing
209 : * the presorted column values to the pivot tuple of the current group.
210 : * ----------------------------------------------------------------
211 : */
212 : static bool
213 34630 : isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
214 : {
215 : int nPresortedCols;
216 :
217 34630 : nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
218 :
219 : /*
220 : * That the input is sorted by keys * (0, ... n) implies that the tail
221 : * keys are more likely to change. Therefore we do our comparison starting
222 : * from the last pre-sorted column to optimize for early detection of
223 : * inequality and minimizing the number of function calls..
224 : */
225 67999 : for (int i = nPresortedCols - 1; i >= 0; i--)
226 : {
227 : Datum datumA,
228 : datumB,
229 : result;
230 : bool isnullA,
231 : isnullB;
232 34630 : AttrNumber attno = node->presorted_keys[i].attno;
233 : PresortedKeyData *key;
234 :
235 34630 : datumA = slot_getattr(pivot, attno, &isnullA);
236 34630 : datumB = slot_getattr(tuple, attno, &isnullB);
237 :
238 : /* Special case for NULL-vs-NULL, else use standard comparison */
239 34630 : if (isnullA || isnullB)
240 : {
1098 tomas.vondra 241 UBC 0 : if (isnullA == isnullB)
242 0 : continue;
243 : else
1098 tomas.vondra 244 CBC 1261 : return false;
245 : }
246 :
247 34630 : key = &node->presorted_keys[i];
248 :
249 34630 : key->fcinfo->args[0].value = datumA;
250 34630 : key->fcinfo->args[1].value = datumB;
251 :
252 : /* just for paranoia's sake, we reset isnull each time */
253 34630 : key->fcinfo->isnull = false;
254 :
255 34630 : result = FunctionCallInvoke(key->fcinfo);
256 :
257 : /* Check for null result, since caller is clearly not expecting one */
258 34630 : if (key->fcinfo->isnull)
1098 tomas.vondra 259 UBC 0 : elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
260 :
1098 tomas.vondra 261 CBC 34630 : if (!DatumGetBool(result))
262 1261 : return false;
263 : }
264 33369 : return true;
265 : }
266 :
267 : /* ----------------------------------------------------------------
268 : * switchToPresortedPrefixMode
269 : *
270 : * When we determine that we've likely encountered a large batch of tuples all
271 : * having the same presorted prefix values, we want to optimize tuplesort by
272 : * only sorting on unsorted suffix keys.
273 : *
274 : * The problem is that we've already accumulated several tuples in another
275 : * tuplesort configured to sort by all columns (assuming that there may be
276 : * more than one prefix key group). So to switch to presorted prefix mode we
277 : * have to go back and look at all the tuples we've already accumulated to
278 : * verify they're all part of the same prefix key group before sorting them
279 : * solely by unsorted suffix keys.
280 : *
281 : * While it's likely that all tuples already fetched are all part of a single
282 : * prefix group, we also have to handle the possibility that there is at least
283 : * one different prefix key group before the large prefix key group.
284 : * ----------------------------------------------------------------
285 : */
286 : static void
287 179 : switchToPresortedPrefixMode(PlanState *pstate)
288 : {
289 179 : IncrementalSortState *node = castNode(IncrementalSortState, pstate);
290 : ScanDirection dir;
291 : int64 nTuples;
292 : TupleDesc tupDesc;
293 : PlanState *outerNode;
294 179 : IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
295 :
296 179 : dir = node->ss.ps.state->es_direction;
297 179 : outerNode = outerPlanState(node);
298 179 : tupDesc = ExecGetResultType(outerNode);
299 :
300 : /* Configure the prefix sort state the first time around. */
301 179 : if (node->prefixsort_state == NULL)
302 : {
303 : Tuplesortstate *prefixsort_state;
304 40 : int nPresortedCols = plannode->nPresortedCols;
305 :
306 : /*
307 : * Optimize the sort by assuming the prefix columns are all equal and
308 : * thus we only need to sort by any remaining columns.
309 : */
310 40 : prefixsort_state = tuplesort_begin_heap(tupDesc,
311 40 : plannode->sort.numCols - nPresortedCols,
312 40 : &(plannode->sort.sortColIdx[nPresortedCols]),
313 40 : &(plannode->sort.sortOperators[nPresortedCols]),
314 40 : &(plannode->sort.collations[nPresortedCols]),
315 40 : &(plannode->sort.nullsFirst[nPresortedCols]),
316 : work_mem,
317 : NULL,
370 drowley 318 40 : node->bounded ? TUPLESORT_ALLOWBOUNDED : TUPLESORT_NONE);
1098 tomas.vondra 319 40 : node->prefixsort_state = prefixsort_state;
320 : }
321 : else
322 : {
323 : /* Next group of presorted data */
324 139 : tuplesort_reset(node->prefixsort_state);
325 : }
326 :
327 : /*
328 : * If the current node has a bound, then it's reasonably likely that a
329 : * large prefix key group will benefit from bounded sort, so configure the
330 : * tuplesort to allow for that optimization.
331 : */
332 179 : if (node->bounded)
333 : {
334 : SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n",
335 : node->bound - node->bound_Done);
336 91 : tuplesort_set_bound(node->prefixsort_state,
337 91 : node->bound - node->bound_Done);
338 : }
339 :
340 : /*
341 : * Copy as many tuples as we can (i.e., in the same prefix key group) from
342 : * the full sort state to the prefix sort state.
343 : */
783 tgl 344 3212 : for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++)
345 : {
346 : /*
347 : * When we encounter multiple prefix key groups inside the full sort
348 : * tuplesort we have to carry over the last read tuple into the next
349 : * batch.
350 : */
351 3161 : if (nTuples == 0 && !TupIsNull(node->transfer_tuple))
352 : {
1098 tomas.vondra 353 128 : tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
354 : /* The carried over tuple is our new group pivot tuple. */
355 128 : ExecCopySlot(node->group_pivot, node->transfer_tuple);
356 : }
357 : else
358 : {
359 3033 : tuplesort_gettupleslot(node->fullsort_state,
360 : ScanDirectionIsForward(dir),
361 : false, node->transfer_tuple, NULL);
362 :
363 : /*
364 : * If this is our first time through the loop, then we need to
365 : * save the first tuple we get as our new group pivot.
366 : */
367 3033 : if (TupIsNull(node->group_pivot))
368 51 : ExecCopySlot(node->group_pivot, node->transfer_tuple);
369 :
370 3033 : if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
371 : {
372 2905 : tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
373 : }
374 : else
375 : {
376 : /*
377 : * The tuple isn't part of the current batch so we need to
378 : * carry it over into the next batch of tuples we transfer out
379 : * of the full sort tuplesort into the presorted prefix
380 : * tuplesort. We don't actually have to do anything special to
381 : * save the tuple since we've already loaded it into the
382 : * node->transfer_tuple slot, and, even though that slot
383 : * points to memory inside the full sort tuplesort, we can't
384 : * reset that tuplesort anyway until we've fully transferred
385 : * out its tuples, so this reference is safe. We do need to
386 : * reset the group pivot tuple though since we've finished the
387 : * current prefix key group.
388 : */
389 128 : ExecClearTuple(node->group_pivot);
390 :
391 : /* Break out of for-loop early */
392 128 : break;
393 : }
394 : }
395 : }
396 :
397 : /*
398 : * Track how many tuples remain in the full sort batch so that we know if
399 : * we need to sort multiple prefix key groups before processing tuples
400 : * remaining in the large single prefix key group we think we've
401 : * encountered.
402 : */
403 : SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);
404 179 : node->n_fullsort_remaining -= nTuples;
405 : SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining);
406 :
783 tgl 407 179 : if (node->n_fullsort_remaining == 0)
408 : {
409 : /*
410 : * We've found that all tuples remaining in the full sort batch are in
411 : * the same prefix key group and moved all of those tuples into the
412 : * presorted prefix tuplesort. We don't know that we've yet found the
413 : * last tuple in the current prefix key group, so save our pivot
414 : * comparison tuple and continue fetching tuples from the outer
415 : * execution node to load into the presorted prefix tuplesort.
416 : */
1098 tomas.vondra 417 51 : ExecCopySlot(node->group_pivot, node->transfer_tuple);
418 : SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
419 51 : node->execution_status = INCSORT_LOADPREFIXSORT;
420 :
421 : /*
422 : * Make sure we clear the transfer tuple slot so that next time we
423 : * encounter a large prefix key group we don't incorrectly assume we
424 : * have a tuple carried over from the previous group.
425 : */
426 51 : ExecClearTuple(node->transfer_tuple);
427 : }
428 : else
429 : {
430 : /*
431 : * We finished a group but didn't consume all of the tuples from the
432 : * full sort state, so we'll sort this batch, let the outer node read
433 : * out all of those tuples, and then come back around to find another
434 : * batch.
435 : */
436 : SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
437 128 : tuplesort_performsort(node->prefixsort_state);
438 :
1060 tgl 439 128 : INSTRUMENT_SORT_GROUP(node, prefixsort);
440 :
1098 tomas.vondra 441 128 : if (node->bounded)
442 : {
443 : /*
444 : * If the current node has a bound and we've already sorted n
445 : * tuples, then the functional bound remaining is (original bound
446 : * - n), so store the current number of processed tuples for use
447 : * in configuring sorting bound.
448 : */
449 : SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
450 : Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
451 60 : node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
452 : }
453 :
454 : SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
455 128 : node->execution_status = INCSORT_READPREFIXSORT;
456 : }
457 179 : }
458 :
459 : /*
460 : * Sorting many small groups with tuplesort is inefficient. In order to
461 : * cope with this problem we don't start a new group until the current one
462 : * contains at least DEFAULT_MIN_GROUP_SIZE tuples (unfortunately this also
463 : * means we can't assume small groups of tuples all have the same prefix keys.)
464 : * When we have a bound that's less than DEFAULT_MIN_GROUP_SIZE we start looking
465 : * for the new group as soon as we've met our bound to avoid fetching more
466 : * tuples than we absolutely have to fetch.
467 : */
468 : #define DEFAULT_MIN_GROUP_SIZE 32
469 :
470 : /*
471 : * While we've optimized for small prefix key groups by not starting our prefix
472 : * key comparisons until we've reached a minimum number of tuples, we don't want
473 : * that optimization to cause us to lose out on the benefits of being able to
474 : * assume a large group of tuples is fully presorted by its prefix keys.
475 : * Therefore we use the DEFAULT_MAX_FULL_SORT_GROUP_SIZE cutoff as a heuristic
476 : * for determining when we believe we've encountered a large group, and, if we
477 : * get to that point without finding a new prefix key group we transition to
478 : * presorted prefix key mode.
479 : */
480 : #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)
481 :
482 : /* ----------------------------------------------------------------
483 : * ExecIncrementalSort
484 : *
485 : * Assuming that outer subtree returns tuple presorted by some prefix
486 : * of target sort columns, performs incremental sort.
487 : *
488 : * Conditions:
489 : * -- none.
490 : *
491 : * Initial States:
492 : * -- the outer child is prepared to return the first tuple.
493 : * ----------------------------------------------------------------
494 : */
495 : static TupleTableSlot *
496 51144 : ExecIncrementalSort(PlanState *pstate)
497 : {
498 51144 : IncrementalSortState *node = castNode(IncrementalSortState, pstate);
499 : EState *estate;
500 : ScanDirection dir;
501 : Tuplesortstate *read_sortstate;
502 : Tuplesortstate *fullsort_state;
503 : TupleTableSlot *slot;
504 51144 : IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
505 : PlanState *outerNode;
506 : TupleDesc tupDesc;
507 51144 : int64 nTuples = 0;
508 : int64 minGroupSize;
509 :
510 51144 : CHECK_FOR_INTERRUPTS();
511 :
512 51144 : estate = node->ss.ps.state;
513 51144 : dir = estate->es_direction;
514 51144 : fullsort_state = node->fullsort_state;
515 :
516 : /*
517 : * If a previous iteration has sorted a batch, then we need to check to
518 : * see if there are any remaining tuples in that batch that we can return
519 : * before moving on to other execution states.
520 : */
521 51144 : if (node->execution_status == INCSORT_READFULLSORT
522 11219 : || node->execution_status == INCSORT_READPREFIXSORT)
523 : {
524 : /*
525 : * Return next tuple from the current sorted group set if available.
526 : */
527 101912 : read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
528 50956 : fullsort_state : node->prefixsort_state;
529 50956 : slot = node->ss.ps.ps_ResultTupleSlot;
530 :
531 : /*
532 : * We have to populate the slot from the tuplesort before checking
533 : * outerNodeDone because it will set the slot to NULL if no more
534 : * tuples remain. If the tuplesort is empty, but we don't have any
535 : * more tuples available for sort from the outer node, then
536 : * outerNodeDone will have been set so we'll return that now-empty
537 : * slot to the caller.
538 : */
539 50956 : if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
540 1309 : false, slot, NULL) || node->outerNodeDone)
541 :
542 : /*
543 : * Note: there isn't a good test case for the node->outerNodeDone
544 : * check directly, but we need it for any plan where the outer
545 : * node will fail when trying to fetch too many tuples.
546 : */
547 49753 : return slot;
548 1203 : else if (node->n_fullsort_remaining > 0)
549 : {
550 : /*
551 : * When we transition to presorted prefix mode, we might have
552 : * accumulated at least one additional prefix key group in the
553 : * full sort tuplesort. The first call to
554 : * switchToPresortedPrefixMode() will have pulled the first one of
555 : * those groups out, and we've returned those tuples to the parent
556 : * node, but if at this point we still have tuples remaining in
557 : * the full sort state (i.e., n_fullsort_remaining > 0), then we
558 : * need to re-execute the prefix mode transition function to pull
559 : * out the next prefix key group.
560 : */
561 : SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
562 : node->n_fullsort_remaining);
563 128 : switchToPresortedPrefixMode(pstate);
564 : }
565 : else
566 : {
567 : /*
568 : * If we don't have any sorted tuples to read and we're not
569 : * currently transitioning into presorted prefix sort mode, then
570 : * it's time to start the process all over again by building a new
571 : * group in the full sort state.
572 : */
573 : SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
574 1075 : node->execution_status = INCSORT_LOADFULLSORT;
575 : }
576 : }
577 :
578 : /*
579 : * Scan the subplan in the forward direction while creating the sorted
580 : * data.
581 : */
582 1391 : estate->es_direction = ForwardScanDirection;
583 :
584 1391 : outerNode = outerPlanState(node);
585 1391 : tupDesc = ExecGetResultType(outerNode);
586 :
587 : /* Load tuples into the full sort state. */
588 1391 : if (node->execution_status == INCSORT_LOADFULLSORT)
589 : {
590 : /*
591 : * Initialize sorting structures.
592 : */
593 1263 : if (fullsort_state == NULL)
594 : {
595 : /*
596 : * Initialize presorted column support structures for
597 : * isCurrentGroup(). It's correct to do this along with the
598 : * initial initialization for the full sort state (and not for the
599 : * prefix sort state) since we always load the full sort state
600 : * first.
601 : */
602 188 : preparePresortedCols(node);
603 :
604 : /*
605 : * Since we optimize small prefix key groups by accumulating a
606 : * minimum number of tuples before sorting, we can't assume that a
607 : * group of tuples all have the same prefix key values. Hence we
608 : * setup the full sort tuplesort to sort by all requested sort
609 : * keys.
610 : */
611 188 : fullsort_state = tuplesort_begin_heap(tupDesc,
612 : plannode->sort.numCols,
613 : plannode->sort.sortColIdx,
614 : plannode->sort.sortOperators,
615 : plannode->sort.collations,
616 : plannode->sort.nullsFirst,
617 : work_mem,
618 : NULL,
370 drowley 619 188 : node->bounded ?
620 : TUPLESORT_ALLOWBOUNDED :
621 : TUPLESORT_NONE);
1098 tomas.vondra 622 188 : node->fullsort_state = fullsort_state;
623 : }
624 : else
625 : {
626 : /* Reset sort for the next batch. */
627 1075 : tuplesort_reset(fullsort_state);
628 : }
629 :
630 : /*
631 : * Calculate the remaining tuples left if bounded and configure both
632 : * bounded sort and the minimum group size accordingly.
633 : */
634 1263 : if (node->bounded)
635 : {
636 106 : int64 currentBound = node->bound - node->bound_Done;
637 :
638 : /*
639 : * Bounded sort isn't likely to be a useful optimization for full
640 : * sort mode since we limit full sort mode to a relatively small
641 : * number of tuples and tuplesort doesn't switch over to top-n
642 : * heap sort anyway unless it hits (2 * bound) tuples.
643 : */
644 106 : if (currentBound < DEFAULT_MIN_GROUP_SIZE)
645 39 : tuplesort_set_bound(fullsort_state, currentBound);
646 :
647 106 : minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
648 : }
649 : else
650 1157 : minGroupSize = DEFAULT_MIN_GROUP_SIZE;
651 :
652 : /*
653 : * Because we have to read the next tuple to find out that we've
654 : * encountered a new prefix key group, on subsequent groups we have to
655 : * carry over that extra tuple and add it to the new group's sort here
656 : * before we read any new tuples from the outer node.
657 : */
658 1263 : if (!TupIsNull(node->group_pivot))
659 : {
660 1075 : tuplesort_puttupleslot(fullsort_state, node->group_pivot);
661 1075 : nTuples++;
662 :
663 : /*
664 : * We're in full sort mode accumulating a minimum number of tuples
665 : * and not checking for prefix key equality yet, so we can't
666 : * assume the group pivot tuple will remain the same -- unless
667 : * we're using a minimum group size of 1, in which case the pivot
668 : * is obviously still the pivot.
669 : */
670 1075 : if (nTuples != minGroupSize)
671 1069 : ExecClearTuple(node->group_pivot);
672 : }
673 :
674 :
675 : /*
676 : * Pull as many tuples from the outer node as possible given our
677 : * current operating mode.
678 : */
679 : for (;;)
680 : {
681 44043 : slot = ExecProcNode(outerNode);
682 :
683 : /*
684 : * If the outer node can't provide us any more tuples, then we can
685 : * sort the current group and return those tuples.
686 : */
687 44043 : if (TupIsNull(slot))
688 : {
689 : /*
690 : * We need to know later if the outer node has completed to be
691 : * able to distinguish between being done with a batch and
692 : * being done with the whole node.
693 : */
694 108 : node->outerNodeDone = true;
695 :
696 : SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples);
697 108 : tuplesort_performsort(fullsort_state);
698 :
1060 tgl 699 108 : INSTRUMENT_SORT_GROUP(node, fullsort);
700 :
701 : SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
1098 tomas.vondra 702 108 : node->execution_status = INCSORT_READFULLSORT;
703 108 : break;
704 : }
705 :
706 : /* Accumulate the next group of presorted tuples. */
707 43935 : if (nTuples < minGroupSize)
708 : {
709 : /*
710 : * If we haven't yet hit our target minimum group size, then
711 : * we don't need to bother checking for inclusion in the
712 : * current prefix group since at this point we'll assume that
713 : * we'll full sort this batch to avoid a large number of very
714 : * tiny (and thus inefficient) sorts.
715 : */
716 35968 : tuplesort_puttupleslot(fullsort_state, slot);
717 35968 : nTuples++;
718 :
719 : /*
720 : * If we've reached our minimum group size, then we need to
721 : * store the most recent tuple as a pivot.
722 : */
723 35968 : if (nTuples == minGroupSize)
724 1149 : ExecCopySlot(node->group_pivot, slot);
725 : }
726 : else
727 : {
728 : /*
729 : * If we've already accumulated enough tuples to reach our
730 : * minimum group size, then we need to compare any additional
731 : * tuples to our pivot tuple to see if we reach the end of
732 : * that prefix key group. Only after we find changed prefix
733 : * keys can we guarantee sort stability of the tuples we've
734 : * already accumulated.
735 : */
736 7967 : if (isCurrentGroup(node, node->group_pivot, slot))
737 : {
738 : /*
739 : * As long as the prefix keys match the pivot tuple then
740 : * load the tuple into the tuplesort.
741 : */
742 6863 : tuplesort_puttupleslot(fullsort_state, slot);
743 6863 : nTuples++;
744 : }
745 : else
746 : {
747 : /*
748 : * Since the tuple we fetched isn't part of the current
749 : * prefix key group we don't want to sort it as part of
750 : * the current batch. Instead we use the group_pivot slot
751 : * to carry it over to the next batch (even though we
752 : * won't actually treat it as a group pivot).
753 : */
754 1104 : ExecCopySlot(node->group_pivot, slot);
755 :
756 1104 : if (node->bounded)
757 : {
758 : /*
759 : * If the current node has a bound, and we've already
760 : * sorted n tuples, then the functional bound
761 : * remaining is (original bound - n), so store the
762 : * current number of processed tuples for later use
763 : * configuring the sort state's bound.
764 : */
765 : SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
766 : node->bound_Done,
767 : Min(node->bound, node->bound_Done + nTuples));
768 75 : node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
769 : }
770 :
771 : /*
772 : * Once we find changed prefix keys we can complete the
773 : * sort and transition modes to reading out the sorted
774 : * tuples.
775 : */
776 : SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n",
777 : nTuples);
778 1104 : tuplesort_performsort(fullsort_state);
779 :
1060 tgl 780 1104 : INSTRUMENT_SORT_GROUP(node, fullsort);
781 :
782 : SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
1098 tomas.vondra 783 1104 : node->execution_status = INCSORT_READFULLSORT;
784 1104 : break;
785 : }
786 : }
787 :
788 : /*
789 : * Unless we've already transitioned modes to reading from the
790 : * full sort state, then we assume that having read at least
791 : * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
792 : * processing a large group of tuples all having equal prefix keys
793 : * (but haven't yet found the final tuple in that prefix key
794 : * group), so we need to transition into presorted prefix mode.
795 : */
796 42831 : if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
797 51 : node->execution_status != INCSORT_READFULLSORT)
798 : {
799 : /*
800 : * The group pivot we have stored has already been put into
801 : * the tuplesort; we don't want to carry it over. Since we
802 : * haven't yet found the end of the prefix key group, it might
803 : * seem like we should keep this, but we don't actually know
804 : * how many prefix key groups might be represented in the full
805 : * sort state, so we'll let the mode transition function
806 : * manage this state for us.
807 : */
808 51 : ExecClearTuple(node->group_pivot);
809 :
810 : /*
811 : * Unfortunately the tuplesort API doesn't include a way to
812 : * retrieve tuples unless a sort has been performed, so we
813 : * perform the sort even though we could just as easily rely
814 : * on FIFO retrieval semantics when transferring them to the
815 : * presorted prefix tuplesort.
816 : */
817 : SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples);
818 51 : tuplesort_performsort(fullsort_state);
819 :
1060 tgl 820 51 : INSTRUMENT_SORT_GROUP(node, fullsort);
821 :
822 : /*
823 : * If the full sort tuplesort happened to switch into top-n
824 : * heapsort mode then we will only be able to retrieve
825 : * currentBound tuples (since the tuplesort will have only
826 : * retained the top-n tuples). This is safe even though we
827 : * haven't yet completed fetching the current prefix key group
828 : * because the tuples we've "lost" already sorted "below" the
829 : * retained ones, and we're already contractually guaranteed
830 : * to not need any more than the currentBound tuples.
831 : */
1098 tomas.vondra 832 51 : if (tuplesort_used_bound(node->fullsort_state))
833 : {
834 6 : int64 currentBound = node->bound - node->bound_Done;
835 :
836 : SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n",
837 : nTuples, Min(currentBound, nTuples));
838 6 : nTuples = Min(currentBound, nTuples);
839 : }
840 :
841 : SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
842 : nTuples);
843 :
844 : /*
845 : * We might have multiple prefix key groups in the full sort
846 : * state, so the mode transition function needs to know that
847 : * it needs to move from the fullsort to presorted prefix
848 : * sort.
849 : */
850 51 : node->n_fullsort_remaining = nTuples;
851 :
852 : /* Transition the tuples to the presorted prefix tuplesort. */
853 51 : switchToPresortedPrefixMode(pstate);
854 :
855 : /*
856 : * Since we know we had tuples to move to the presorted prefix
857 : * tuplesort, we know that unless that transition has verified
858 : * that all tuples belonged to the same prefix key group (in
859 : * which case we can go straight to continuing to load tuples
860 : * into that tuplesort), we should have a tuple to return
861 : * here.
862 : *
863 : * Either way, the appropriate execution status should have
864 : * been set by switchToPresortedPrefixMode(), so we can drop
865 : * out of the loop here and let the appropriate path kick in.
866 : */
867 51 : break;
868 : }
869 : }
870 : }
871 :
872 1391 : if (node->execution_status == INCSORT_LOADPREFIXSORT)
873 : {
874 : /*
875 : * We only enter this state after the mode transition function has
876 : * confirmed all remaining tuples from the full sort state have the
877 : * same prefix and moved those tuples to the prefix sort state. That
878 : * function has also set a group pivot tuple (which doesn't need to be
879 : * carried over; it's already been put into the prefix sort state).
880 : */
881 51 : Assert(!TupIsNull(node->group_pivot));
882 :
883 : /*
884 : * Read tuples from the outer node and load them into the prefix sort
885 : * state until we encounter a tuple whose prefix keys don't match the
886 : * current group_pivot tuple, since we can't guarantee sort stability
887 : * until we have all tuples matching those prefix keys.
888 : */
889 : for (;;)
890 : {
891 23652 : slot = ExecProcNode(outerNode);
892 :
893 : /*
894 : * If we've exhausted tuples from the outer node we're done
895 : * loading the prefix sort state.
896 : */
897 23652 : if (TupIsNull(slot))
898 : {
899 : /*
900 : * We need to know later if the outer node has completed to be
901 : * able to distinguish between being done with a batch and
902 : * being done with the whole node.
903 : */
904 22 : node->outerNodeDone = true;
905 22 : break;
906 : }
907 :
908 : /*
909 : * If the tuple's prefix keys match our pivot tuple, we're not
910 : * done yet and can load it into the prefix sort state. If not, we
911 : * don't want to sort it as part of the current batch. Instead we
912 : * use the group_pivot slot to carry it over to the next batch
913 : * (even though we won't actually treat it as a group pivot).
914 : */
915 23630 : if (isCurrentGroup(node, node->group_pivot, slot))
916 : {
917 23601 : tuplesort_puttupleslot(node->prefixsort_state, slot);
918 23601 : nTuples++;
919 : }
920 : else
921 : {
922 29 : ExecCopySlot(node->group_pivot, slot);
923 29 : break;
924 : }
925 : }
926 :
927 : /*
928 : * Perform the sort and begin returning the tuples to the parent plan
929 : * node.
930 : */
931 : SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
932 51 : tuplesort_performsort(node->prefixsort_state);
933 :
1060 tgl 934 51 : INSTRUMENT_SORT_GROUP(node, prefixsort);
935 :
936 : SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
1098 tomas.vondra 937 51 : node->execution_status = INCSORT_READPREFIXSORT;
938 :
939 51 : if (node->bounded)
940 : {
941 : /*
942 : * If the current node has a bound, and we've already sorted n
943 : * tuples, then the functional bound remaining is (original bound
944 : * - n), so store the current number of processed tuples for use
945 : * in configuring sorting bound.
946 : */
947 : SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
948 : node->bound_Done,
949 : Min(node->bound, node->bound_Done + nTuples));
950 31 : node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
951 : }
952 : }
953 :
954 : /* Restore to user specified direction. */
955 1391 : estate->es_direction = dir;
956 :
957 : /*
958 : * Get the first or next tuple from tuplesort. Returns NULL if no more
959 : * tuples.
960 : */
961 2782 : read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
962 1391 : fullsort_state : node->prefixsort_state;
963 1391 : slot = node->ss.ps.ps_ResultTupleSlot;
964 1391 : (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
965 : false, slot, NULL);
966 1391 : return slot;
967 : }
968 :
969 : /* ----------------------------------------------------------------
970 : * ExecInitIncrementalSort
971 : *
972 : * Creates the run-time state information for the sort node
973 : * produced by the planner and initializes its outer subtree.
974 : * ----------------------------------------------------------------
975 : */
976 : IncrementalSortState *
977 297 : ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
978 : {
979 : IncrementalSortState *incrsortstate;
980 :
981 : SO_printf("ExecInitIncrementalSort: initializing sort node\n");
982 :
983 : /*
984 : * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
985 : * EXEC_FLAG_MARK, because the current sort state contains only one sort
986 : * batch rather than the full result set.
987 : */
1065 988 297 : Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
989 :
990 : /* Initialize state structure. */
1098 991 297 : incrsortstate = makeNode(IncrementalSortState);
992 297 : incrsortstate->ss.ps.plan = (Plan *) node;
993 297 : incrsortstate->ss.ps.state = estate;
994 297 : incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
995 :
996 297 : incrsortstate->execution_status = INCSORT_LOADFULLSORT;
997 297 : incrsortstate->bounded = false;
998 297 : incrsortstate->outerNodeDone = false;
999 297 : incrsortstate->bound_Done = 0;
1000 297 : incrsortstate->fullsort_state = NULL;
1001 297 : incrsortstate->prefixsort_state = NULL;
1002 297 : incrsortstate->group_pivot = NULL;
1003 297 : incrsortstate->transfer_tuple = NULL;
1004 297 : incrsortstate->n_fullsort_remaining = 0;
1005 297 : incrsortstate->presorted_keys = NULL;
1006 :
1007 297 : if (incrsortstate->ss.ps.instrument != NULL)
1008 : {
1098 tomas.vondra 1009 UBC 0 : IncrementalSortGroupInfo *fullsortGroupInfo =
1010 : &incrsortstate->incsort_info.fullsortGroupInfo;
1011 0 : IncrementalSortGroupInfo *prefixsortGroupInfo =
1012 : &incrsortstate->incsort_info.prefixsortGroupInfo;
1013 :
1014 0 : fullsortGroupInfo->groupCount = 0;
1015 0 : fullsortGroupInfo->maxDiskSpaceUsed = 0;
1016 0 : fullsortGroupInfo->totalDiskSpaceUsed = 0;
1017 0 : fullsortGroupInfo->maxMemorySpaceUsed = 0;
1018 0 : fullsortGroupInfo->totalMemorySpaceUsed = 0;
1019 0 : fullsortGroupInfo->sortMethods = 0;
1020 0 : prefixsortGroupInfo->groupCount = 0;
1021 0 : prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1022 0 : prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1023 0 : prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1024 0 : prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1025 0 : prefixsortGroupInfo->sortMethods = 0;
1026 : }
1027 :
1028 : /*
1029 : * Miscellaneous initialization
1030 : *
1031 : * Sort nodes don't initialize their ExprContexts because they never call
1032 : * ExecQual or ExecProject.
1033 : */
1034 :
1035 : /*
1036 : * Initialize child nodes.
1037 : *
1038 : * Incremental sort does not support backwards scans and mark/restore, so
1039 : * we don't bother removing the flags from eflags here. We allow passing a
1040 : * REWIND flag, because although incremental sort can't use it, the child
1041 : * nodes may be able to do something more useful.
1042 : */
1098 tomas.vondra 1043 CBC 297 : outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1044 :
1045 : /*
1046 : * Initialize scan slot and type.
1047 : */
1048 297 : ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1049 :
1050 : /*
1051 : * Initialize return slot and type. No need to initialize projection info
1052 : * because we don't do any projections.
1053 : */
1054 297 : ExecInitResultTupleSlotTL(&incrsortstate->ss.ps, &TTSOpsMinimalTuple);
1055 297 : incrsortstate->ss.ps.ps_ProjInfo = NULL;
1056 :
1057 : /*
1058 : * Initialize standalone slots to store a tuple for pivot prefix keys and
1059 : * for carrying over a tuple from one batch to the next.
1060 : */
1061 297 : incrsortstate->group_pivot =
1062 297 : MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1063 : &TTSOpsMinimalTuple);
1064 297 : incrsortstate->transfer_tuple =
1065 297 : MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1066 : &TTSOpsMinimalTuple);
1067 :
1068 : SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1069 :
1070 297 : return incrsortstate;
1071 : }
1072 :
1073 : /* ----------------------------------------------------------------
1074 : * ExecEndIncrementalSort(node)
1075 : * ----------------------------------------------------------------
1076 : */
1077 : void
1078 297 : ExecEndIncrementalSort(IncrementalSortState *node)
1079 : {
1080 : SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1081 :
1082 : /* clean out the scan tuple */
1083 297 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
1084 : /* must drop pointer to sort result tuple */
1085 297 : ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1086 : /* must drop standalone tuple slots from outer node */
1087 297 : ExecDropSingleTupleTableSlot(node->group_pivot);
1088 297 : ExecDropSingleTupleTableSlot(node->transfer_tuple);
1089 :
1090 : /*
1091 : * Release tuplesort resources.
1092 : */
1093 297 : if (node->fullsort_state != NULL)
1094 : {
1095 185 : tuplesort_end(node->fullsort_state);
1096 185 : node->fullsort_state = NULL;
1097 : }
1098 297 : if (node->prefixsort_state != NULL)
1099 : {
1100 37 : tuplesort_end(node->prefixsort_state);
1101 37 : node->prefixsort_state = NULL;
1102 : }
1103 :
1104 : /*
1105 : * Shut down the subplan.
1106 : */
1107 297 : ExecEndNode(outerPlanState(node));
1108 :
1109 : SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1110 297 : }
1111 :
1112 : void
1113 6 : ExecReScanIncrementalSort(IncrementalSortState *node)
1114 : {
1115 6 : PlanState *outerPlan = outerPlanState(node);
1116 :
1117 : /*
1118 : * Incremental sort doesn't support efficient rescan even when parameters
1119 : * haven't changed (e.g., rewind) because unlike regular sort we don't
1120 : * store all tuples at once for the full sort.
1121 : *
1122 : * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1123 : * re-execute the sort along with the child node. Incremental sort itself
1124 : * can't do anything smarter, but maybe the child nodes can.
1125 : *
1126 : * In theory if we've only filled the full sort with one batch (and
1127 : * haven't reset it for a new batch yet) then we could efficiently rewind,
1128 : * but that seems a narrow enough case that it's not worth handling
1129 : * specially at this time.
1130 : */
1131 :
1132 : /* must drop pointer to sort result tuple */
1133 6 : ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1134 :
1135 6 : if (node->group_pivot != NULL)
1136 6 : ExecClearTuple(node->group_pivot);
1137 6 : if (node->transfer_tuple != NULL)
1138 6 : ExecClearTuple(node->transfer_tuple);
1139 :
1140 6 : node->outerNodeDone = false;
1141 6 : node->n_fullsort_remaining = 0;
1142 6 : node->bound_Done = 0;
1143 6 : node->presorted_keys = NULL;
1144 :
1145 6 : node->execution_status = INCSORT_LOADFULLSORT;
1146 :
1147 : /*
1148 : * If we've set up either of the sort states yet, we need to reset them.
1149 : * We could end them and null out the pointers, but there's no reason to
1150 : * repay the setup cost, and because ExecIncrementalSort guards presorted
1151 : * column functions by checking to see if the full sort state has been
1152 : * initialized yet, setting the sort states to null here might actually
1153 : * cause a leak.
1154 : */
1155 6 : if (node->fullsort_state != NULL)
1156 : {
1157 3 : tuplesort_reset(node->fullsort_state);
1158 3 : node->fullsort_state = NULL;
1159 : }
1160 6 : if (node->prefixsort_state != NULL)
1161 : {
1162 3 : tuplesort_reset(node->prefixsort_state);
1163 3 : node->prefixsort_state = NULL;
1164 : }
1165 :
1166 : /*
1167 : * If chgParam of subnode is not null, then the plan will be re-scanned by
1168 : * the first ExecProcNode.
1169 : */
1170 6 : if (outerPlan->chgParam == NULL)
1171 6 : ExecReScan(outerPlan);
1172 6 : }
1173 :
1174 : /* ----------------------------------------------------------------
1175 : * Parallel Query Support
1176 : * ----------------------------------------------------------------
1177 : */
1178 :
1179 : /* ----------------------------------------------------------------
1180 : * ExecSortEstimate
1181 : *
1182 : * Estimate space required to propagate sort statistics.
1183 : * ----------------------------------------------------------------
1184 : */
1185 : void
1098 tomas.vondra 1186 UBC 0 : ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
1187 : {
1188 : Size size;
1189 :
1190 : /* don't need this if not instrumenting or no workers */
1191 0 : if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1192 0 : return;
1193 :
1194 0 : size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1195 0 : size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1196 0 : shm_toc_estimate_chunk(&pcxt->estimator, size);
1197 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1198 : }
1199 :
1200 : /* ----------------------------------------------------------------
1201 : * ExecSortInitializeDSM
1202 : *
1203 : * Initialize DSM space for sort statistics.
1204 : * ----------------------------------------------------------------
1205 : */
1206 : void
1207 0 : ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
1208 : {
1209 : Size size;
1210 :
1211 : /* don't need this if not instrumenting or no workers */
1212 0 : if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1213 0 : return;
1214 :
1215 0 : size = offsetof(SharedIncrementalSortInfo, sinfo)
1216 0 : + pcxt->nworkers * sizeof(IncrementalSortInfo);
1217 0 : node->shared_info = shm_toc_allocate(pcxt->toc, size);
1218 : /* ensure any unfilled slots will contain zeroes */
1219 0 : memset(node->shared_info, 0, size);
1220 0 : node->shared_info->num_workers = pcxt->nworkers;
1221 0 : shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1222 0 : node->shared_info);
1223 : }
1224 :
1225 : /* ----------------------------------------------------------------
1226 : * ExecSortInitializeWorker
1227 : *
1228 : * Attach worker to DSM space for sort statistics.
1229 : * ----------------------------------------------------------------
1230 : */
1231 : void
1232 0 : ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
1233 : {
1234 0 : node->shared_info =
1235 0 : shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1236 0 : node->am_worker = true;
1237 0 : }
1238 :
1239 : /* ----------------------------------------------------------------
1240 : * ExecSortRetrieveInstrumentation
1241 : *
1242 : * Transfer sort statistics from DSM to private memory.
1243 : * ----------------------------------------------------------------
1244 : */
1245 : void
1246 0 : ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
1247 : {
1248 : Size size;
1249 : SharedIncrementalSortInfo *si;
1250 :
1251 0 : if (node->shared_info == NULL)
1252 0 : return;
1253 :
1254 0 : size = offsetof(SharedIncrementalSortInfo, sinfo)
1255 0 : + node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1256 0 : si = palloc(size);
1257 0 : memcpy(si, node->shared_info, size);
1258 0 : node->shared_info = si;
1259 : }
|