Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplesort.c
4 : * Generalized tuple sorting routines.
5 : *
6 : * This module provides a generalized facility for tuple sorting, which can be
7 : * applied to different kinds of sortable objects. Implementation of
8 : * the particular sorting variants is given in tuplesortvariants.c.
9 : * This module works efficiently for both small and large amounts
10 : * of data. Small amounts are sorted in-memory using qsort(). Large
11 : * amounts are sorted using temporary files and a standard external sort
12 : * algorithm.
13 : *
14 : * See Knuth, volume 3, for more than you want to know about external
15 : * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 : * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 : * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 : * merge is better. Knuth is assuming that tape drives are expensive
19 : * beasts, and in particular that there will always be many more runs than
20 : * tape drives. The polyphase merge algorithm was good at keeping all the
21 : * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 : * much more than a few Kb of memory buffers, so we can afford to have
23 : * lots of them. In particular, if we can have as many tape drives as
24 : * sorted runs, we can eliminate any repeated I/O at all.
25 : *
26 : * Historically, we divided the input into sorted runs using replacement
27 : * selection, in the form of a priority tree implemented as a heap
28 : * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 : * for run generation.
30 : *
31 : * The approximate amount of memory allowed for any one sort operation
32 : * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 : * we absorb tuples and simply store them in an unsorted array as long as
34 : * we haven't exceeded workMem. If we reach the end of the input without
35 : * exceeding workMem, we sort the array using qsort() and subsequently return
36 : * tuples just by scanning the tuple array sequentially. If we do exceed
37 : * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 : * When tuples are dumped in batch after quicksorting, we begin a new run
39 : * with a new output tape. If we reach the max number of tapes, we write
40 : * subsequent runs on the existing tapes in a round-robin fashion. We will
41 : * need multiple merge passes to finish the merge in that case. After the
42 : * end of the input is reached, we dump out remaining tuples in memory into
43 : * a final run, then merge the runs.
44 : *
45 : * When merging runs, we use a heap containing just the frontmost tuple from
46 : * each source run; we repeatedly output the smallest tuple and replace it
47 : * with the next tuple from its source tape (if any). When the heap empties,
48 : * the merge is complete. The basic merge algorithm thus needs very little
49 : * memory --- only M tuples for an M-way merge, and M is constrained to a
50 : * small number. However, we can still make good use of our full workMem
51 : * allocation by pre-reading additional blocks from each source tape. Without
52 : * prereading, our access pattern to the temporary file would be very erratic;
53 : * on average we'd read one block from each of M source tapes during the same
54 : * time that we're writing M blocks to the output tape, so there is no
55 : * sequentiality of access at all, defeating the read-ahead methods used by
56 : * most Unix kernels. Worse, the output tape gets written into a very random
57 : * sequence of blocks of the temp file, ensuring that things will be even
58 : * worse when it comes time to read that tape. A straightforward merge pass
59 : * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 : * by prereading from each source tape sequentially, loading about workMem/M
61 : * bytes from each tape in turn, and making the sequential blocks immediately
62 : * available for reuse. This approach helps to localize both read and write
63 : * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 : * much memory to use for the buffers.
65 : *
66 : * In the current code we determine the number of input tapes M on the basis
67 : * of workMem: we want workMem/M to be large enough that we read a fair
68 : * amount of data each time we read from a tape, so as to maintain the
69 : * locality of access described above. Nonetheless, with large workMem we
70 : * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 : * which avoids space wastage by recycling disk space as soon as each block
72 : * is read from its "tape".
73 : *
74 : * When the caller requests random access to the sort result, we form
75 : * the final sorted run on a logical tape which is then "frozen", so
76 : * that we can access it randomly. When the caller does not need random
77 : * access, we return from tuplesort_performsort() as soon as we are down
78 : * to one run per logical tape. The final merge is then performed
79 : * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 : * saves one cycle of writing all the data out to disk and reading it in.
81 : *
82 : * This module supports parallel sorting. Parallel sorts involve coordination
83 : * among one or more worker processes, and a leader process, each with its own
84 : * tuplesort state. The leader process (or, more accurately, the
85 : * Tuplesortstate associated with a leader process) creates a full tapeset
86 : * consisting of worker tapes with one run to merge; a run for every
87 : * worker process. This is then merged. Worker processes are guaranteed to
88 : * produce exactly one output run from their partial input.
89 : *
90 : *
91 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
92 : * Portions Copyright (c) 1994, Regents of the University of California
93 : *
94 : * IDENTIFICATION
95 : * src/backend/utils/sort/tuplesort.c
96 : *
97 : *-------------------------------------------------------------------------
98 : */
99 :
100 : #include "postgres.h"
101 :
102 : #include <limits.h>
103 :
104 : #include "catalog/pg_am.h"
105 : #include "commands/tablespace.h"
106 : #include "executor/executor.h"
107 : #include "miscadmin.h"
108 : #include "pg_trace.h"
109 : #include "storage/shmem.h"
110 : #include "utils/memutils.h"
111 : #include "utils/pg_rusage.h"
112 : #include "utils/rel.h"
113 : #include "utils/tuplesort.h"
114 :
115 : /*
116 : * Initial size of memtuples array. We're trying to select this size so that
117 : * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
118 : * allocation might possibly be lowered. However, we don't consider array sizes
119 : * less than 1024.
120 : *
121 : */
122 : #define INITIAL_MEMTUPSIZE Max(1024, \
123 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
124 :
125 : /* GUC variables */
126 : #ifdef TRACE_SORT
127 : bool trace_sort = false;
128 : #endif
129 :
130 : #ifdef DEBUG_BOUNDED_SORT
131 : bool optimize_bounded_sort = true;
132 : #endif
133 :
134 :
135 : /*
136 : * During merge, we use a pre-allocated set of fixed-size slots to hold
137 : * tuples. To avoid palloc/pfree overhead.
138 : *
139 : * Merge doesn't require a lot of memory, so we can afford to waste some,
140 : * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
141 : * palloc() overhead is not significant anymore.
142 : *
143 : * 'nextfree' is valid when this chunk is in the free list. When in use, the
144 : * slot holds a tuple.
145 : */
146 : #define SLAB_SLOT_SIZE 1024
147 :
148 : typedef union SlabSlot
149 : {
150 : union SlabSlot *nextfree;
151 : char buffer[SLAB_SLOT_SIZE];
152 : } SlabSlot;
153 :
154 : /*
155 : * Possible states of a Tuplesort object. These denote the states that
156 : * persist between calls of Tuplesort routines.
157 : */
158 : typedef enum
159 : {
160 : TSS_INITIAL, /* Loading tuples; still within memory limit */
161 : TSS_BOUNDED, /* Loading tuples into bounded-size heap */
162 : TSS_BUILDRUNS, /* Loading tuples; writing to tape */
163 : TSS_SORTEDINMEM, /* Sort completed entirely in memory */
164 : TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
165 : TSS_FINALMERGE /* Performing final merge on-the-fly */
166 : } TupSortStatus;
167 :
168 : /*
169 : * Parameters for calculation of number of tapes to use --- see inittapes()
170 : * and tuplesort_merge_order().
171 : *
172 : * In this calculation we assume that each tape will cost us about 1 blocks
173 : * worth of buffer space. This ignores the overhead of all the other data
174 : * structures needed for each tape, but it's probably close enough.
175 : *
176 : * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
177 : * input tape, for pre-reading (see discussion at top of file). This is *in
178 : * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
179 : */
180 : #define MINORDER 6 /* minimum merge order */
181 : #define MAXORDER 500 /* maximum merge order */
182 : #define TAPE_BUFFER_OVERHEAD BLCKSZ
183 : #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
184 :
185 :
186 : /*
187 : * Private state of a Tuplesort operation.
188 : */
189 : struct Tuplesortstate
190 : {
191 : TuplesortPublic base;
192 : TupSortStatus status; /* enumerated value as shown above */
193 : bool bounded; /* did caller specify a maximum number of
194 : * tuples to return? */
195 : bool boundUsed; /* true if we made use of a bounded heap */
196 : int bound; /* if bounded, the maximum number of tuples */
197 : int64 availMem; /* remaining memory available, in bytes */
198 : int64 allowedMem; /* total memory allowed, in bytes */
199 : int maxTapes; /* max number of input tapes to merge in each
200 : * pass */
201 : int64 maxSpace; /* maximum amount of space occupied among sort
202 : * of groups, either in-memory or on-disk */
203 : bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
204 : * space, false when it's value for in-memory
205 : * space */
206 : TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
207 : LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
208 :
209 : /*
210 : * This array holds the tuples now in sort memory. If we are in state
211 : * INITIAL, the tuples are in no particular order; if we are in state
212 : * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
213 : * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
214 : * H. In state SORTEDONTAPE, the array is not used.
215 : */
216 : SortTuple *memtuples; /* array of SortTuple structs */
217 : int memtupcount; /* number of tuples currently present */
218 : int memtupsize; /* allocated length of memtuples array */
219 : bool growmemtuples; /* memtuples' growth still underway? */
220 :
221 : /*
222 : * Memory for tuples is sometimes allocated using a simple slab allocator,
223 : * rather than with palloc(). Currently, we switch to slab allocation
224 : * when we start merging. Merging only needs to keep a small, fixed
225 : * number of tuples in memory at any time, so we can avoid the
226 : * palloc/pfree overhead by recycling a fixed number of fixed-size slots
227 : * to hold the tuples.
228 : *
229 : * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
230 : * slots. The allocation is sized to have one slot per tape, plus one
231 : * additional slot. We need that many slots to hold all the tuples kept
232 : * in the heap during merge, plus the one we have last returned from the
233 : * sort, with tuplesort_gettuple.
234 : *
235 : * Initially, all the slots are kept in a linked list of free slots. When
236 : * a tuple is read from a tape, it is put to the next available slot, if
237 : * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
238 : * instead.
239 : *
240 : * When we're done processing a tuple, we return the slot back to the free
241 : * list, or pfree() if it was palloc'd. We know that a tuple was
242 : * allocated from the slab, if its pointer value is between
243 : * slabMemoryBegin and -End.
244 : *
245 : * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
246 : * tracking memory usage is not used.
247 : */
248 : bool slabAllocatorUsed;
249 :
250 : char *slabMemoryBegin; /* beginning of slab memory arena */
251 : char *slabMemoryEnd; /* end of slab memory arena */
252 : SlabSlot *slabFreeHead; /* head of free list */
253 :
254 : /* Memory used for input and output tape buffers. */
255 : size_t tape_buffer_mem;
256 :
257 : /*
258 : * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
259 : * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
260 : * modes), we remember the tuple in 'lastReturnedTuple', so that we can
261 : * recycle the memory on next gettuple call.
262 : */
263 : void *lastReturnedTuple;
264 :
265 : /*
266 : * While building initial runs, this is the current output run number.
267 : * Afterwards, it is the number of initial runs we made.
268 : */
269 : int currentRun;
270 :
271 : /*
272 : * Logical tapes, for merging.
273 : *
274 : * The initial runs are written in the output tapes. In each merge pass,
275 : * the output tapes of the previous pass become the input tapes, and new
276 : * output tapes are created as needed. When nInputTapes equals
277 : * nInputRuns, there is only one merge pass left.
278 : */
279 : LogicalTape **inputTapes;
280 : int nInputTapes;
281 : int nInputRuns;
282 :
283 : LogicalTape **outputTapes;
284 : int nOutputTapes;
285 : int nOutputRuns;
286 :
287 : LogicalTape *destTape; /* current output tape */
288 :
289 : /*
290 : * These variables are used after completion of sorting to keep track of
291 : * the next tuple to return. (In the tape case, the tape's current read
292 : * position is also critical state.)
293 : */
294 : LogicalTape *result_tape; /* actual tape of finished output */
295 : int current; /* array index (only used if SORTEDINMEM) */
296 : bool eof_reached; /* reached EOF (needed for cursors) */
297 :
298 : /* markpos_xxx holds marked position for mark and restore */
299 : long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
300 : int markpos_offset; /* saved "current", or offset in tape block */
301 : bool markpos_eof; /* saved "eof_reached" */
302 :
303 : /*
304 : * These variables are used during parallel sorting.
305 : *
306 : * worker is our worker identifier. Follows the general convention that
307 : * -1 value relates to a leader tuplesort, and values >= 0 worker
308 : * tuplesorts. (-1 can also be a serial tuplesort.)
309 : *
310 : * shared is mutable shared memory state, which is used to coordinate
311 : * parallel sorts.
312 : *
313 : * nParticipants is the number of worker Tuplesortstates known by the
314 : * leader to have actually been launched, which implies that they must
315 : * finish a run that the leader needs to merge. Typically includes a
316 : * worker state held by the leader process itself. Set in the leader
317 : * Tuplesortstate only.
318 : */
319 : int worker;
320 : Sharedsort *shared;
321 : int nParticipants;
322 :
323 : /*
324 : * Additional state for managing "abbreviated key" sortsupport routines
325 : * (which currently may be used by all cases except the hash index case).
326 : * Tracks the intervals at which the optimization's effectiveness is
327 : * tested.
328 : */
329 : int64 abbrevNext; /* Tuple # at which to next check
330 : * applicability */
331 :
332 : /*
333 : * Resource snapshot for time of sort start.
334 : */
335 : #ifdef TRACE_SORT
336 : PGRUsage ru_start;
337 : #endif
338 : };
339 :
340 : /*
341 : * Private mutable state of tuplesort-parallel-operation. This is allocated
342 : * in shared memory.
343 : */
1892 rhaas 344 ECB : struct Sharedsort
345 : {
346 : /* mutex protects all fields prior to tapes */
347 : slock_t mutex;
348 :
349 : /*
350 : * currentWorker generates ordinal identifier numbers for parallel sort
351 : * workers. These start from 0, and are always gapless.
352 : *
353 : * Workers increment workersFinished to indicate having finished. If this
354 : * is equal to state.nParticipants within the leader, leader is ready to
355 : * merge worker runs.
356 : */
357 : int currentWorker;
358 : int workersFinished;
1892 rhaas 359 EUB :
360 : /* Temporary file space */
1892 rhaas 361 ECB : SharedFileSet fileset;
362 :
363 : /* Size of tapes flexible array */
364 : int nTapes;
365 :
366 : /*
367 : * Tapes array used by workers to report back information needed by the
368 : * leader to concatenate all worker tapes into one for merging
369 : */
370 : TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
371 : };
372 :
373 : /*
374 : * Is the given tuple allocated from the slab memory arena?
2379 heikki.linnakangas 375 : */
376 : #define IS_SLAB_SLOT(state, tuple) \
377 : ((char *) (tuple) >= (state)->slabMemoryBegin && \
378 : (char *) (tuple) < (state)->slabMemoryEnd)
379 :
380 : /*
381 : * Return the given tuple to the slab memory free list, or free it
382 : * if it was palloc'd.
383 : */
384 : #define RELEASE_SLAB_SLOT(state, tuple) \
385 : do { \
386 : SlabSlot *buf = (SlabSlot *) tuple; \
387 : \
388 : if (IS_SLAB_SLOT((state), buf)) \
389 : { \
390 : buf->nextfree = (state)->slabFreeHead; \
391 : (state)->slabFreeHead = buf; \
392 : } else \
393 : pfree(buf); \
394 : } while(0)
395 :
396 : #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
397 : #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
398 : #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
399 : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
400 : #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
401 : #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
402 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
403 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
404 : #define SERIAL(state) ((state)->shared == NULL)
405 : #define WORKER(state) ((state)->shared && (state)->worker != -1)
406 : #define LEADER(state) ((state)->shared && (state)->worker == -1)
8575 tgl 407 :
6258 408 : /*
409 : * NOTES about on-tape representation of tuples:
8575 410 : *
411 : * We require the first "unsigned int" of a stored tuple to be the total size
412 : * on-tape of the tuple, including itself (so it is never zero; an all-zero
413 : * unsigned int is used to delimit runs). The remainder of the stored tuple
414 : * may or may not match the in-memory representation of the tuple ---
415 : * any conversion needed is the job of the writetup and readtup routines.
416 : *
417 : * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
418 : * representation of the tuple must be followed by another "unsigned int" that
419 : * is a copy of the length --- so the total tape space used is actually
420 : * sizeof(unsigned int) more than the stored length value. This allows
421 : * read-backwards. When the random access flag was not specified, the
422 : * write/read routines may omit the extra length word.
423 : *
424 : * writetup is expected to write both length words as well as the tuple
425 : * data. When readtup is called, the tape is positioned just after the
426 : * front length word; readtup must read the tuple data and advance past
427 : * the back length word (if present).
428 : *
429 : * The write/read routines can make use of the tuple description data
430 : * stored in the Tuplesortstate record, if needed. They are also expected
431 : * to adjust state->availMem by the amount of memory space (not tape space!)
432 : * released or consumed. There is no error return from either writetup
433 : * or readtup; they should ereport() on failure.
434 : *
435 : *
436 : * NOTES about memory consumption calculations:
437 : *
438 : * We count space allocated for tuples against the workMem limit, plus
439 : * the space used by the variable-size memtuples array. Fixed-size space
440 : * is not counted; it's small enough to not be interesting.
441 : *
442 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
443 : * rather than the originally-requested size. This is important since
444 : * palloc can add substantial overhead. It's not a complete answer since
445 : * we won't count any wasted space in palloc allocation blocks, but it's
446 : * a lot better than what we were doing before 7.3. As of 9.6, a
447 : * separate memory context is used for caller passed tuples. Resetting
448 : * it at certain key increments significantly ameliorates fragmentation.
449 : * readtup routines use the slab allocator (they cannot use
450 : * the reset context because it gets deleted at the point that merging
451 : * begins).
452 : */
453 :
454 :
455 : static void tuplesort_begin_batch(Tuplesortstate *state);
456 : static bool consider_abort_common(Tuplesortstate *state);
457 : static void inittapes(Tuplesortstate *state, bool mergeruns);
458 : static void inittapestate(Tuplesortstate *state, int maxTapes);
459 : static void selectnewtape(Tuplesortstate *state);
460 : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
461 : static void mergeruns(Tuplesortstate *state);
462 : static void mergeonerun(Tuplesortstate *state);
463 : static void beginmerge(Tuplesortstate *state);
464 : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
465 : static void dumptuples(Tuplesortstate *state, bool alltuples);
466 : static void make_bounded_heap(Tuplesortstate *state);
467 : static void sort_bounded_heap(Tuplesortstate *state);
468 : static void tuplesort_sort_memtuples(Tuplesortstate *state);
469 : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
470 : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
471 : static void tuplesort_heap_delete_top(Tuplesortstate *state);
472 : static void reversedirection(Tuplesortstate *state);
473 : static unsigned int getlen(LogicalTape *tape, bool eofOK);
474 : static void markrunend(LogicalTape *tape);
475 : static int worker_get_identifier(Tuplesortstate *state);
476 : static void worker_freeze_result_tape(Tuplesortstate *state);
477 : static void worker_nomergeruns(Tuplesortstate *state);
478 : static void leader_takeover_tapes(Tuplesortstate *state);
479 : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
480 : static void tuplesort_free(Tuplesortstate *state);
481 : static void tuplesort_updatemax(Tuplesortstate *state);
482 :
372 john.naylor 483 : /*
484 : * Specialized comparators that we can inline into specialized sorts. The goal
485 : * is to try to sort two tuples without having to follow the pointers to the
486 : * comparator or the tuple.
487 : *
488 : * XXX: For now, these fall back to comparator functions that will compare the
372 john.naylor 489 EUB : * leading datum a second time.
490 : *
491 : * XXX: For now, there is no specialization for cases where datum1 is
372 john.naylor 492 ECB : * authoritative and we don't even need to fall back to a callback at all (that
493 : * would be true for types like int4/int8/timestamp/date, but not true for
494 : * abbreviations of text or multi-key sorts. There could be! Is it worth it?
495 : */
496 :
497 : /* Used if first key's comparator is ssup_datum_unsigned_compare */
498 : static pg_attribute_always_inline int
372 john.naylor 499 GIC 22722154 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
500 : {
501 : int compare;
372 john.naylor 502 ECB :
372 john.naylor 503 CBC 22722154 : compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
504 22722154 : b->datum1, b->isnull1,
505 : &state->base.sortKeys[0]);
372 john.naylor 506 GIC 22722154 : if (compare != 0)
507 20647602 : return compare;
508 :
509 : /*
332 tgl 510 ECB : * No need to waste effort calling the tiebreak function when there are no
511 : * other keys to sort on.
512 : */
256 akorotkov 513 GNC 2074552 : if (state->base.onlyKey != NULL)
352 drowley 514 UIC 0 : return 0;
515 :
256 akorotkov 516 GNC 2074552 : return state->base.comparetup(a, b, state);
372 john.naylor 517 ECB : }
518 :
519 : #if SIZEOF_DATUM >= 8
520 : /* Used if first key's comparator is ssup_datum_signed_compare */
521 : static pg_attribute_always_inline int
372 john.naylor 522 GIC 2811465 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
372 john.naylor 523 ECB : {
524 : int compare;
525 :
372 john.naylor 526 CBC 2811465 : compare = ApplySignedSortComparator(a->datum1, a->isnull1,
527 2811465 : b->datum1, b->isnull1,
528 : &state->base.sortKeys[0]);
529 :
530 2811465 : if (compare != 0)
372 john.naylor 531 GIC 2806238 : return compare;
532 :
352 drowley 533 ECB : /*
332 tgl 534 : * No need to waste effort calling the tiebreak function when there are no
535 : * other keys to sort on.
536 : */
256 akorotkov 537 GNC 5227 : if (state->base.onlyKey != NULL)
352 drowley 538 GIC 521 : return 0;
539 :
256 akorotkov 540 GNC 4706 : return state->base.comparetup(a, b, state);
372 john.naylor 541 ECB : }
333 drowley 542 : #endif
372 john.naylor 543 :
544 : /* Used if first key's comparator is ssup_datum_int32_compare */
545 : static pg_attribute_always_inline int
372 john.naylor 546 CBC 26236863 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
547 : {
372 john.naylor 548 ECB : int compare;
549 :
372 john.naylor 550 GIC 26236863 : compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
332 tgl 551 26236863 : b->datum1, b->isnull1,
552 : &state->base.sortKeys[0]);
553 :
372 john.naylor 554 26236863 : if (compare != 0)
555 18709011 : return compare;
556 :
557 : /*
558 : * No need to waste effort calling the tiebreak function when there are no
332 tgl 559 ECB : * other keys to sort on.
560 : */
256 akorotkov 561 GNC 7527852 : if (state->base.onlyKey != NULL)
352 drowley 562 GIC 839021 : return 0;
352 drowley 563 ECB :
256 akorotkov 564 GNC 6688831 : return state->base.comparetup(a, b, state);
565 : }
566 :
567 : /*
568 : * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
569 : * any variant of SortTuples, using the appropriate comparetup function.
570 : * qsort_ssup() is specialized for the case where the comparetup function
571 : * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
572 : * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
573 : * common comparison functions on pass-by-value leading datums.
574 : */
575 :
576 : #define ST_SORT qsort_tuple_unsigned
577 : #define ST_ELEMENT_TYPE SortTuple
372 john.naylor 578 ECB : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
579 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
580 : #define ST_CHECK_FOR_INTERRUPTS
581 : #define ST_SCOPE static
582 : #define ST_DEFINE
583 : #include "lib/sort_template.h"
584 :
585 : #if SIZEOF_DATUM >= 8
586 : #define ST_SORT qsort_tuple_signed
587 : #define ST_ELEMENT_TYPE SortTuple
588 : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
589 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
590 : #define ST_CHECK_FOR_INTERRUPTS
591 : #define ST_SCOPE static
592 : #define ST_DEFINE
593 : #include "lib/sort_template.h"
333 drowley 594 : #endif
595 :
372 john.naylor 596 : #define ST_SORT qsort_tuple_int32
597 : #define ST_ELEMENT_TYPE SortTuple
598 : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
599 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
600 : #define ST_CHECK_FOR_INTERRUPTS
601 : #define ST_SCOPE static
602 : #define ST_DEFINE
603 : #include "lib/sort_template.h"
604 :
605 : #define ST_SORT qsort_tuple
767 tmunro 606 EUB : #define ST_ELEMENT_TYPE SortTuple
607 : #define ST_COMPARE_RUNTIME_POINTER
608 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
609 : #define ST_CHECK_FOR_INTERRUPTS
767 tmunro 610 ECB : #define ST_SCOPE static
611 : #define ST_DECLARE
612 : #define ST_DEFINE
613 : #include "lib/sort_template.h"
614 :
615 : #define ST_SORT qsort_ssup
616 : #define ST_ELEMENT_TYPE SortTuple
617 : #define ST_COMPARE(a, b, ssup) \
767 tmunro 618 EUB : ApplySortComparator((a)->datum1, (a)->isnull1, \
619 : (b)->datum1, (b)->isnull1, (ssup))
767 tmunro 620 ECB : #define ST_COMPARE_ARG_TYPE SortSupportData
621 : #define ST_CHECK_FOR_INTERRUPTS
622 : #define ST_SCOPE static
623 : #define ST_DEFINE
624 : #include "lib/sort_template.h"
625 :
626 : /*
8575 tgl 627 : * tuplesort_begin_xxx
628 : *
629 : * Initialize for a tuple sort operation.
630 : *
631 : * After calling tuplesort_begin, the caller should call tuplesort_putXXX
632 : * zero or more times, then call tuplesort_performsort when all the tuples
633 : * have been supplied. After performsort, retrieve the tuples in sorted
634 : * order by calling tuplesort_getXXX until it returns false/NULL. (If random
635 : * access was requested, rescan, markpos, and restorepos can also be called.)
636 : * Call tuplesort_end to terminate the operation and release memory/disk space.
637 : *
638 : * Each variant of tuplesort_begin has a workMem parameter specifying the
639 : * maximum number of kilobytes of RAM to use before spilling data to disk.
640 : * (The normal value of this parameter is work_mem, but some callers use
641 : * other values.) Each variant also has a sortopt which is a bitmask of
642 : * sort options. See TUPLESORT_* definitions in tuplesort.h
643 : */
644 :
645 : Tuplesortstate *
370 drowley 646 GIC 189815 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
647 : {
8575 tgl 648 ECB : Tuplesortstate *state;
649 : MemoryContext maincontext;
6251 650 : MemoryContext sortcontext;
651 : MemoryContext oldcontext;
652 :
653 : /* See leader_takeover_tapes() remarks on random access support */
370 drowley 654 CBC 189815 : if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
1892 rhaas 655 UIC 0 : elog(ERROR, "random access disallowed under parallel sort");
656 :
6251 tgl 657 ECB : /*
1098 tomas.vondra 658 EUB : * Memory context surviving tuplesort_reset. This memory context holds
659 : * data which is useful to keep while sorting multiple similar batches.
660 : */
1098 tomas.vondra 661 GIC 189815 : maincontext = AllocSetContextCreate(CurrentMemoryContext,
662 : "TupleSort main",
663 : ALLOCSET_DEFAULT_SIZES);
664 :
665 : /*
666 : * Create a working memory context for one sort operation. The content of
1098 tomas.vondra 667 ECB : * this context is deleted by tuplesort_reset.
1098 tomas.vondra 668 EUB : */
1098 tomas.vondra 669 GIC 189815 : sortcontext = AllocSetContextCreate(maincontext,
1098 tomas.vondra 670 ECB : "TupleSort sort",
671 : ALLOCSET_DEFAULT_SIZES);
672 :
673 : /*
674 : * Additionally a working memory context for tuples is setup in
675 : * tuplesort_begin_batch.
676 : */
677 :
6251 tgl 678 : /*
1098 tomas.vondra 679 : * Make the Tuplesortstate within the per-sortstate context. This way, we
6031 bruce 680 : * don't need a separate pfree() operation for it at shutdown.
681 : */
1098 tomas.vondra 682 GIC 189815 : oldcontext = MemoryContextSwitchTo(maincontext);
8575 tgl 683 ECB :
7452 bruce 684 CBC 189815 : state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
685 :
686 : #ifdef TRACE_SORT
6397 tgl 687 GIC 189815 : if (trace_sort)
6397 tgl 688 UIC 0 : pg_rusage_init(&state->ru_start);
689 : #endif
690 :
256 akorotkov 691 GNC 189815 : state->base.sortopt = sortopt;
692 189815 : state->base.tuples = true;
693 189815 : state->abbrevNext = 10;
1892 rhaas 694 ECB :
695 : /*
696 : * workMem is forced to be at least 64KB, the current minimum valid value
697 : * for the work_mem GUC. This is a defense against parallel sort callers
698 : * that divide out memory among many workers in a way that leaves each
699 : * with very little memory.
700 : */
1892 rhaas 701 GIC 189815 : state->allowedMem = Max(workMem, 64) * (int64) 1024;
256 akorotkov 702 GNC 189815 : state->base.sortcontext = sortcontext;
703 189815 : state->base.maincontext = maincontext;
704 :
2805 tgl 705 ECB : /*
706 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
707 : * see comments in grow_memtuples().
708 : */
1098 tomas.vondra 709 GIC 189815 : state->memtupsize = INITIAL_MEMTUPSIZE;
710 189815 : state->memtuples = NULL;
711 :
712 : /*
1098 tomas.vondra 713 ECB : * After all of the other non-parallel-related state, we setup all of the
714 : * state needed for each batch.
715 : */
1098 tomas.vondra 716 CBC 189815 : tuplesort_begin_batch(state);
717 :
718 : /*
719 : * Initialize parallel-related state based on coordination information
720 : * from caller
721 : */
1892 rhaas 722 GIC 189815 : if (!coordinate)
723 : {
724 : /* Serial sort */
725 189506 : state->shared = NULL;
726 189506 : state->worker = -1;
727 189506 : state->nParticipants = -1;
728 : }
1892 rhaas 729 CBC 309 : else if (coordinate->isWorker)
1892 rhaas 730 ECB : {
731 : /* Parallel worker produces exactly one final run from all input */
1892 rhaas 732 GIC 206 : state->shared = coordinate->sharedsort;
1892 rhaas 733 CBC 206 : state->worker = worker_get_identifier(state);
1892 rhaas 734 GIC 206 : state->nParticipants = -1;
1892 rhaas 735 EUB : }
736 : else
737 : {
738 : /* Parallel leader state only used for final merge */
1892 rhaas 739 GIC 103 : state->shared = coordinate->sharedsort;
1892 rhaas 740 GBC 103 : state->worker = -1;
1892 rhaas 741 GIC 103 : state->nParticipants = coordinate->nParticipants;
742 103 : Assert(state->nParticipants >= 1);
743 : }
744 :
6251 tgl 745 189815 : MemoryContextSwitchTo(oldcontext);
746 :
8575 747 189815 : return state;
748 : }
749 :
750 : /*
751 : * tuplesort_begin_batch
752 : *
753 : * Setup, or reset, all state need for processing a new set of tuples with this
754 : * sort state. Called both from tuplesort_begin_common (the first time sorting
1090 michael 755 ECB : * with this sort state) and tuplesort_reset (for subsequent usages).
1098 tomas.vondra 756 : */
757 : static void
1098 tomas.vondra 758 GIC 191035 : tuplesort_begin_batch(Tuplesortstate *state)
759 : {
760 : MemoryContext oldcontext;
1098 tomas.vondra 761 ECB :
256 akorotkov 762 GNC 191035 : oldcontext = MemoryContextSwitchTo(state->base.maincontext);
763 :
764 : /*
765 : * Caller tuple (e.g. IndexTuple) memory context.
766 : *
767 : * A dedicated child context used exclusively for caller passed tuples
768 : * eases memory management. Resetting at key points reduces
769 : * fragmentation. Note that the memtuples array of SortTuples is allocated
770 : * in the parent context, not this context, because there is no need to
771 : * free memtuples early. For bounded sorts, tuples may be pfreed in any
772 : * order, so we use a regular aset.c context so that it can make use of
773 : * free'd memory. When the sort is not bounded, we make use of a
370 drowley 774 ECB : * generation.c context as this keeps allocations more compact with less
775 : * wastage. Allocations are also slightly more CPU efficient.
1098 tomas.vondra 776 : */
256 akorotkov 777 GNC 191035 : if (state->base.sortopt & TUPLESORT_ALLOWBOUNDED)
778 924 : state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
779 : "Caller tuples",
780 : ALLOCSET_DEFAULT_SIZES);
781 : else
782 190111 : state->base.tuplecontext = GenerationContextCreate(state->base.sortcontext,
783 : "Caller tuples",
784 : ALLOCSET_DEFAULT_SIZES);
785 :
786 :
1098 tomas.vondra 787 GIC 191035 : state->status = TSS_INITIAL;
788 191035 : state->bounded = false;
789 191035 : state->boundUsed = false;
790 :
1098 tomas.vondra 791 CBC 191035 : state->availMem = state->allowedMem;
792 :
1098 tomas.vondra 793 GIC 191035 : state->tapeset = NULL;
794 :
795 191035 : state->memtupcount = 0;
796 :
797 : /*
798 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
799 : * see comments in grow_memtuples().
800 : */
801 191035 : state->growmemtuples = true;
802 191035 : state->slabAllocatorUsed = false;
803 191035 : if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
804 : {
1098 tomas.vondra 805 LBC 0 : pfree(state->memtuples);
1098 tomas.vondra 806 UIC 0 : state->memtuples = NULL;
1098 tomas.vondra 807 UBC 0 : state->memtupsize = INITIAL_MEMTUPSIZE;
1098 tomas.vondra 808 EUB : }
1098 tomas.vondra 809 GIC 191035 : if (state->memtuples == NULL)
810 : {
811 189815 : state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
1098 tomas.vondra 812 CBC 189815 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1098 tomas.vondra 813 ECB : }
814 :
815 : /* workMem must be large enough for the minimal memtuples array */
1098 tomas.vondra 816 GIC 191035 : if (LACKMEM(state))
1098 tomas.vondra 817 UIC 0 : elog(ERROR, "insufficient memory allowed for sort");
818 :
1098 tomas.vondra 819 GIC 191035 : state->currentRun = 0;
820 :
821 : /*
822 : * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
823 : * inittapes(), if needed.
1098 tomas.vondra 824 ECB : */
825 :
538 heikki.linnakangas 826 GIC 191035 : state->result_tape = NULL; /* flag that result tape has not been formed */
1098 tomas.vondra 827 ECB :
1098 tomas.vondra 828 CBC 191035 : MemoryContextSwitchTo(oldcontext);
829 191035 : }
830 :
831 : /*
832 : * tuplesort_set_bound
5819 tgl 833 ECB : *
834 : * Advise tuplesort that at most the first N result tuples are required.
5819 tgl 835 EUB : *
3260 bruce 836 : * Must be called before inserting any tuples. (Actually, we could allow it
837 : * as long as the sort hasn't spilled to disk, but there seems no need for
838 : * delayed calls at the moment.)
839 : *
5819 tgl 840 : * This is a hint only. The tuplesort may still return more tuples than
841 : * requested. Parallel leader tuplesorts will always ignore the hint.
842 : */
843 : void
5819 tgl 844 GIC 857 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
5819 tgl 845 ECB : {
846 : /* Assert we're called before loading any tuples */
1305 alvherre 847 GIC 857 : Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
848 : /* Assert we allow bounded sorts */
256 akorotkov 849 GNC 857 : Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
850 : /* Can't set the bound twice, either */
5819 tgl 851 GIC 857 : Assert(!state->bounded);
852 : /* Also, this shouldn't be called in a parallel worker */
1892 rhaas 853 857 : Assert(!WORKER(state));
854 :
855 : /* Parallel leader allows but ignores hint */
1304 tgl 856 CBC 857 : if (LEADER(state))
1304 tgl 857 UIC 0 : return;
858 :
859 : #ifdef DEBUG_BOUNDED_SORT
860 : /* Honor GUC setting that disables the feature (for easy testing) */
861 : if (!optimize_bounded_sort)
5819 tgl 862 ECB : return;
863 : #endif
864 :
865 : /* We want to be able to compute bound * 2, so limit the setting */
5624 bruce 866 CBC 857 : if (bound > (int64) (INT_MAX / 2))
5819 tgl 867 LBC 0 : return;
5819 tgl 868 ECB :
5819 tgl 869 CBC 857 : state->bounded = true;
5819 tgl 870 GIC 857 : state->bound = (int) bound;
3002 rhaas 871 ECB :
872 : /*
873 : * Bounded sorts are not an effective target for abbreviated key
874 : * optimization. Disable by setting state to be consistent with no
875 : * abbreviation support.
876 : */
256 akorotkov 877 GNC 857 : state->base.sortKeys->abbrev_converter = NULL;
878 857 : if (state->base.sortKeys->abbrev_full_comparator)
879 35 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
880 :
881 : /* Not strictly necessary, but be tidy */
882 857 : state->base.sortKeys->abbrev_abort = NULL;
883 857 : state->base.sortKeys->abbrev_full_comparator = NULL;
5819 tgl 884 EUB : }
885 :
8575 tgl 886 ECB : /*
887 : * tuplesort_used_bound
888 : *
889 : * Allow callers to find out if the sort state was able to use a bound.
1098 tomas.vondra 890 : */
1098 tomas.vondra 891 EUB : bool
1098 tomas.vondra 892 GIC 51 : tuplesort_used_bound(Tuplesortstate *state)
893 : {
894 51 : return state->boundUsed;
895 : }
896 :
1098 tomas.vondra 897 ECB : /*
898 : * tuplesort_free
899 : *
900 : * Internal routine for freeing resources of tuplesort.
8575 tgl 901 : */
1098 tomas.vondra 902 : static void
1098 tomas.vondra 903 CBC 190945 : tuplesort_free(Tuplesortstate *state)
904 : {
6251 tgl 905 ECB : /* context swap probably not needed, but let's be safe */
256 akorotkov 906 GNC 190945 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
907 :
908 : #ifdef TRACE_SORT
909 : long spaceUsed;
8575 tgl 910 ECB :
8575 tgl 911 CBC 190945 : if (state->tapeset)
6382 912 334 : spaceUsed = LogicalTapeSetBlocks(state->tapeset);
913 : else
6382 tgl 914 GIC 190611 : spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
915 : #endif
916 :
917 : /*
6251 tgl 918 ECB : * Delete temporary "tape" files, if any.
919 : *
6031 bruce 920 : * Note: want to include this in reported total cost of sort, hence need
921 : * for two #ifdef TRACE_SORT sections.
922 : *
923 : * We don't bother to destroy the individual tapes here. They will go away
538 heikki.linnakangas 924 : * with the sortcontext. (In TSS_FINALMERGE state, we have closed
925 : * finished tapes already.)
7836 bruce 926 : */
6251 tgl 927 GBC 190945 : if (state->tapeset)
6251 tgl 928 GIC 334 : LogicalTapeSetClose(state->tapeset);
8007 tgl 929 ECB :
930 : #ifdef TRACE_SORT
6397 tgl 931 CBC 190945 : if (trace_sort)
932 : {
6382 tgl 933 UIC 0 : if (state->tapeset)
1620 pg 934 0 : elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
935 : SERIAL(state) ? "external sort" : "parallel external sort",
936 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
937 : else
1620 pg 938 LBC 0 : elog(LOG, "%s of worker %d ended, %ld KB used: %s",
939 : SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1892 rhaas 940 ECB : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
941 : }
942 :
943 : TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
5142 tgl 944 : #else
5364 alvherre 945 :
946 : /*
947 : * If you disabled TRACE_SORT, you can still probe sort__done, but you
948 : * ain't getting space-used stats.
949 : */
950 : TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
951 : #endif
952 :
256 akorotkov 953 GNC 190945 : FREESTATE(state);
6251 tgl 954 CBC 190945 : MemoryContextSwitchTo(oldcontext);
955 :
6251 tgl 956 ECB : /*
1098 tomas.vondra 957 EUB : * Free the per-sort memory context, thereby releasing all working memory.
6251 tgl 958 ECB : */
256 akorotkov 959 GNC 190945 : MemoryContextReset(state->base.sortcontext);
1098 tomas.vondra 960 CBC 190945 : }
961 :
962 : /*
963 : * tuplesort_end
964 : *
965 : * Release resources and clean up.
966 : *
967 : * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1098 tomas.vondra 968 ECB : * pointing to garbage. Be careful not to attempt to use or free such
969 : * pointers afterwards!
970 : */
1098 tomas.vondra 971 EUB : void
1098 tomas.vondra 972 CBC 189725 : tuplesort_end(Tuplesortstate *state)
1098 tomas.vondra 973 EUB : {
1098 tomas.vondra 974 CBC 189725 : tuplesort_free(state);
975 :
976 : /*
977 : * Free the main memory context, including the Tuplesortstate struct
978 : * itself.
1098 tomas.vondra 979 ECB : */
256 akorotkov 980 GNC 189725 : MemoryContextDelete(state->base.maincontext);
1098 tomas.vondra 981 CBC 189725 : }
982 :
983 : /*
984 : * tuplesort_updatemax
985 : *
986 : * Update maximum resource usage statistics.
987 : */
988 : static void
1098 tomas.vondra 989 GIC 1400 : tuplesort_updatemax(Tuplesortstate *state)
1098 tomas.vondra 990 ECB : {
991 : int64 spaceUsed;
992 : bool isSpaceDisk;
1098 tomas.vondra 993 EUB :
994 : /*
995 : * Note: it might seem we should provide both memory and disk usage for a
1098 tomas.vondra 996 ECB : * disk-based sort. However, the current code doesn't track memory space
997 : * accurately once we have begun to return tuples to the caller (since we
998 : * don't account for pfree's the caller is expected to do), so we cannot
999 : * rely on availMem in a disk sort. This does not seem worth the overhead
1000 : * to fix. Is it worth creating an API for the memory context code to
1001 : * tell us how much is actually used in sortcontext?
1002 : */
1098 tomas.vondra 1003 CBC 1400 : if (state->tapeset)
1004 : {
1098 tomas.vondra 1005 LBC 0 : isSpaceDisk = true;
1098 tomas.vondra 1006 UBC 0 : spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1098 tomas.vondra 1007 ECB : }
1008 : else
1009 : {
1098 tomas.vondra 1010 GIC 1400 : isSpaceDisk = false;
1011 1400 : spaceUsed = state->allowedMem - state->availMem;
1012 : }
1098 tomas.vondra 1013 ECB :
1014 : /*
1062 1015 : * Sort evicts data to the disk when it wasn't able to fit that data into
1016 : * main memory. This is why we assume space used on the disk to be more
1060 tgl 1017 : * important for tracking resource usage than space used in memory. Note
1018 : * that the amount of space occupied by some tupleset on the disk might be
1019 : * less than amount of space occupied by the same tupleset in memory due
1020 : * to more compact representation.
1021 : */
1098 tomas.vondra 1022 GIC 1400 : if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1023 1400 : (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1024 : {
1025 188 : state->maxSpace = spaceUsed;
1098 tomas.vondra 1026 CBC 188 : state->isMaxSpaceDisk = isSpaceDisk;
1098 tomas.vondra 1027 GIC 188 : state->maxSpaceStatus = state->status;
1098 tomas.vondra 1028 ECB : }
1098 tomas.vondra 1029 CBC 1400 : }
1030 :
1031 : /*
1032 : * tuplesort_reset
1033 : *
1034 : * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1098 tomas.vondra 1035 ECB : * meta-information in. After tuplesort_reset, tuplesort is ready to start
1036 : * a new sort. This allows avoiding recreation of tuple sort states (and
1037 : * save resources) when sorting multiple small batches.
1038 : */
1039 : void
1098 tomas.vondra 1040 GIC 1220 : tuplesort_reset(Tuplesortstate *state)
1098 tomas.vondra 1041 ECB : {
1098 tomas.vondra 1042 GIC 1220 : tuplesort_updatemax(state);
1043 1220 : tuplesort_free(state);
1044 :
1045 : /*
1046 : * After we've freed up per-batch memory, re-setup all of the state common
1098 tomas.vondra 1047 ECB : * to both the first batch and any subsequent batch.
1048 : */
1098 tomas.vondra 1049 GIC 1220 : tuplesort_begin_batch(state);
1050 :
1051 1220 : state->lastReturnedTuple = NULL;
1052 1220 : state->slabMemoryBegin = NULL;
1098 tomas.vondra 1053 CBC 1220 : state->slabMemoryEnd = NULL;
1098 tomas.vondra 1054 GIC 1220 : state->slabFreeHead = NULL;
6251 tgl 1055 1220 : }
1056 :
1057 : /*
1058 : * Grow the memtuples[] array, if possible within our memory constraint. We
3573 noah 1059 ECB : * must not exceed INT_MAX tuples in memory or the caller-provided memory
2062 peter_e 1060 : * limit. Return true if we were able to enlarge the array, false if not.
1061 : *
1062 : * Normally, at each increment we double the size of the array. When doing
1063 : * that would exceed a limit, we attempt one last, smaller increase (and then
1064 : * clear the growmemtuples flag so we don't try any more). That allows us to
1065 : * use memory as fully as permitted; sticking to the pure doubling rule could
3573 noah 1066 : * result in almost half going unused. Because availMem moves around with
1067 : * tuple addition/removal, we need some rule to prevent making repeated small
1068 : * increases in memtupsize, which would just be useless thrashing. The
1069 : * growmemtuples flag accomplishes that and also prevents useless
1070 : * recalculations in this function.
6251 tgl 1071 : */
1072 : static bool
6251 tgl 1073 CBC 5244 : grow_memtuples(Tuplesortstate *state)
1074 : {
3734 tgl 1075 EUB : int newmemtupsize;
3734 tgl 1076 GBC 5244 : int memtupsize = state->memtupsize;
3566 noah 1077 GIC 5244 : int64 memNowUsed = state->allowedMem - state->availMem;
1078 :
1079 : /* Forget it if we've already maxed out memtuples, per comment above */
3734 tgl 1080 5244 : if (!state->growmemtuples)
1081 40 : return false;
1082 :
1083 : /* Select new value of memtupsize */
1084 5204 : if (memNowUsed <= state->availMem)
1085 : {
1086 : /*
1087 : * We've used no more than half of allowedMem; double our usage,
3566 noah 1088 ECB : * clamping at INT_MAX tuples.
1089 : */
3573 noah 1090 GIC 5154 : if (memtupsize < INT_MAX / 2)
1091 5154 : newmemtupsize = memtupsize * 2;
1092 : else
1093 : {
3573 noah 1094 UIC 0 : newmemtupsize = INT_MAX;
1095 0 : state->growmemtuples = false;
3573 noah 1096 ECB : }
3734 tgl 1097 : }
1098 : else
1099 : {
1100 : /*
1101 : * This will be the last increment of memtupsize. Abandon doubling
1102 : * strategy and instead increase as much as we safely can.
1103 : *
1104 : * To stay within allowedMem, we can't increase memtupsize by more
3260 bruce 1105 : * than availMem / sizeof(SortTuple) elements. In practice, we want
3734 tgl 1106 : * to increase it by considerably less, because we need to leave some
1107 : * space for the tuples to which the new array slots will refer. We
3734 tgl 1108 EUB : * assume the new tuples will be about the same size as the tuples
1109 : * we've already seen, and thus we can extrapolate from the space
1110 : * consumption so far to estimate an appropriate new size for the
1111 : * memtuples array. The optimal value might be higher or lower than
1112 : * this estimate, but it's hard to know that in advance. We again
1113 : * clamp at INT_MAX tuples.
1114 : *
1115 : * This calculation is safe against enlarging the array so much that
1116 : * LACKMEM becomes true, because the memory currently used includes
1117 : * the present array; thus, there would be enough allowedMem for the
1118 : * new array elements even if no other memory were currently used.
1119 : *
1120 : * We do the arithmetic in float8, because otherwise the product of
3573 noah 1121 ECB : * memtupsize and allowedMem could overflow. Any inaccuracy in the
1122 : * result should be insignificant; but even if we computed a
1123 : * completely insane result, the checks below will prevent anything
1124 : * really bad from happening.
1125 : */
1126 : double grow_ratio;
1127 :
3734 tgl 1128 CBC 50 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
3573 noah 1129 50 : if (memtupsize * grow_ratio < INT_MAX)
3573 noah 1130 GIC 50 : newmemtupsize = (int) (memtupsize * grow_ratio);
1131 : else
3573 noah 1132 UIC 0 : newmemtupsize = INT_MAX;
3734 tgl 1133 ECB :
1134 : /* We won't make any further enlargement attempts */
3734 tgl 1135 GBC 50 : state->growmemtuples = false;
3734 tgl 1136 EUB : }
1137 :
3734 tgl 1138 ECB : /* Must enlarge array by at least one element, else report failure */
3734 tgl 1139 GIC 5204 : if (newmemtupsize <= memtupsize)
3734 tgl 1140 LBC 0 : goto noalloc;
3734 tgl 1141 ECB :
1142 : /*
3573 noah 1143 EUB : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1144 : * to ensure our request won't be rejected. Note that we can easily
1145 : * exhaust address space before facing this outcome. (This is presently
1146 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1147 : * don't rely on that at this distance.)
1148 : */
3573 noah 1149 GIC 5204 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1150 : {
3573 noah 1151 UIC 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
3734 tgl 1152 0 : state->growmemtuples = false; /* can't grow any more */
1153 : }
1154 :
1155 : /*
3734 tgl 1156 ECB : * We need to be sure that we do not cause LACKMEM to become true, else
1157 : * the space management algorithm will go nuts. The code above should
1158 : * never generate a dangerous request, but to be safe, check explicitly
1159 : * that the array growth fits within availMem. (We could still cause
1160 : * LACKMEM if the memory chunk overhead associated with the memtuples
1161 : * array were to increase. That shouldn't happen because we chose the
1162 : * initial array size large enough to ensure that palloc will be treating
1163 : * both old and new arrays as separate chunks. But we'll check LACKMEM
1164 : * explicitly below just in case.)
1165 : */
3566 noah 1166 GIC 5204 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
3734 tgl 1167 UIC 0 : goto noalloc;
1168 :
1169 : /* OK, do it */
6251 tgl 1170 GIC 5204 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
3734 1171 5204 : state->memtupsize = newmemtupsize;
6251 1172 5204 : state->memtuples = (SortTuple *)
3573 noah 1173 5204 : repalloc_huge(state->memtuples,
1174 5204 : state->memtupsize * sizeof(SortTuple));
6251 tgl 1175 5204 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1176 5204 : if (LACKMEM(state))
2805 tgl 1177 UIC 0 : elog(ERROR, "unexpected out-of-memory situation in tuplesort");
6251 tgl 1178 GIC 5204 : return true;
1179 :
3734 tgl 1180 UIC 0 : noalloc:
3734 tgl 1181 ECB : /* If for any reason we didn't realloc, shut off future attempts */
3734 tgl 1182 UIC 0 : state->growmemtuples = false;
1183 0 : return false;
1184 : }
1185 :
1186 : /*
1187 : * Shared code for tuple and datum cases.
1188 : */
1189 : void
256 akorotkov 1190 GNC 19235664 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev)
1191 : {
1192 19235664 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
256 akorotkov 1193 ECB :
1892 rhaas 1194 GNC 19235664 : Assert(!LEADER(state));
1195 :
1196 : /* Count the size of the out-of-line data */
256 akorotkov 1197 19235664 : if (tuple->tuple != NULL)
1198 18327022 : USEMEM(state, GetMemoryChunkSpace(tuple->tuple));
1199 :
1200 19235664 : if (!useAbbrev)
1201 : {
1202 : /*
1203 : * Leave ordinary Datum representation, or NULL value. If there is a
256 akorotkov 1204 EUB : * converter it won't expect NULL values, and cost model is not
1205 : * required to account for NULL, so in that case we avoid calling
1206 : * converter and just set datum1 to zeroed representation (to be
1207 : * consistent, and to support cheap inequality tests for NULL
1208 : * abbreviated keys).
256 akorotkov 1209 ECB : */
1210 : }
256 akorotkov 1211 CBC 2367567 : else if (!consider_abort_common(state))
1212 : {
1213 : /* Store abbreviated key representation */
256 akorotkov 1214 GNC 2367490 : tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1215 : state->base.sortKeys);
1216 : }
1217 : else
1218 : {
256 akorotkov 1219 ECB : /*
1220 : * Set state to be consistent with never trying abbreviation.
1221 : *
1222 : * Alter datum1 representation in already-copied tuples, so as to
1223 : * ensure a consistent representation (current tuple was just
1224 : * handled). It does not matter if some dumped tuples are already
1225 : * sorted on tape, since serialized tuples lack abbreviated keys
1226 : * (TSS_BUILDRUNS state prevents control reaching here in any case).
1227 : */
256 akorotkov 1228 GNC 77 : REMOVEABBREV(state, state->memtuples, state->memtupcount);
1229 : }
1230 :
8575 tgl 1231 GIC 19235664 : switch (state->status)
8575 tgl 1232 ECB : {
7836 bruce 1233 CBC 16781339 : case TSS_INITIAL:
1234 :
8575 tgl 1235 ECB : /*
1236 : * Save the tuple into the unsorted array. First, grow the array
1237 : * as needed. Note that we try to grow the array when there is
1238 : * still one free slot remaining --- if we fail, there'll still be
1239 : * room to store the incoming tuple, and then we'll switch to
1240 : * tape-based operation.
1241 : */
6251 tgl 1242 GIC 16781339 : if (state->memtupcount >= state->memtupsize - 1)
8575 tgl 1243 ECB : {
6251 tgl 1244 CBC 5244 : (void) grow_memtuples(state);
6251 tgl 1245 GIC 5244 : Assert(state->memtupcount < state->memtupsize);
1246 : }
6251 tgl 1247 CBC 16781339 : state->memtuples[state->memtupcount++] = *tuple;
8397 bruce 1248 ECB :
1249 : /*
1250 : * Check if it's time to switch over to a bounded heapsort. We do
1251 : * so if the input tuple count exceeds twice the desired tuple
1252 : * count (this is a heuristic for where heapsort becomes cheaper
1253 : * than a quicksort), or if we've just filled workMem and have
1254 : * enough tuples to meet the bound.
5819 tgl 1255 : *
1256 : * Note that once we enter TSS_BOUNDED state we will always try to
1257 : * complete the sort that way. In the worst case, if later input
1258 : * tuples are larger than earlier ones, this might cause us to
1259 : * exceed workMem significantly.
1260 : */
5819 tgl 1261 CBC 16781339 : if (state->bounded &&
1262 21874 : (state->memtupcount > state->bound * 2 ||
1263 21674 : (state->memtupcount > state->bound && LACKMEM(state))))
1264 : {
1265 : #ifdef TRACE_SORT
5819 tgl 1266 GIC 200 : if (trace_sort)
5819 tgl 1267 UIC 0 : elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1268 : state->memtupcount,
1269 : pg_rusage_show(&state->ru_start));
1270 : #endif
5819 tgl 1271 GIC 200 : make_bounded_heap(state);
256 akorotkov 1272 GNC 200 : MemoryContextSwitchTo(oldcontext);
5819 tgl 1273 GIC 200 : return;
1274 : }
1275 :
1276 : /*
1277 : * Done if we still fit in available memory and have array slots.
1278 : */
6251 tgl 1279 CBC 16781139 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1280 : {
256 akorotkov 1281 GNC 16781081 : MemoryContextSwitchTo(oldcontext);
8575 tgl 1282 CBC 16781081 : return;
1283 : }
1284 :
8575 tgl 1285 ECB : /*
1286 : * Nope; time to switch to tape-based operation.
1287 : */
1892 rhaas 1288 GIC 58 : inittapes(state, true);
1289 :
1290 : /*
1291 : * Dump all tuples.
1292 : */
8575 tgl 1293 58 : dumptuples(state, false);
1294 58 : break;
5819 tgl 1295 ECB :
5819 tgl 1296 CBC 1868011 : case TSS_BOUNDED:
5624 bruce 1297 ECB :
5819 tgl 1298 : /*
1299 : * We don't want to grow the array here, so check whether the new
1300 : * tuple can be discarded before putting it in. This should be a
1301 : * good speed optimization, too, since when there are many more
1302 : * input tuples than the bound, most input tuples can be discarded
1303 : * with just this one comparison. Note that because we currently
1304 : * have the sort direction reversed, we must check for <= not >=.
1305 : */
5819 tgl 1306 CBC 1868011 : if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1307 : {
5819 tgl 1308 ECB : /* new tuple <= top of the heap, so we can discard it */
5819 tgl 1309 GBC 1616559 : free_sort_tuple(state, tuple);
4037 rhaas 1310 GIC 1616559 : CHECK_FOR_INTERRUPTS();
1311 : }
1312 : else
1313 : {
1314 : /* discard top of heap, replacing it with the new tuple */
5819 tgl 1315 251452 : free_sort_tuple(state, &state->memtuples[0]);
2018 rhaas 1316 251452 : tuplesort_heap_replace_top(state, tuple);
1317 : }
5819 tgl 1318 1868011 : break;
1319 :
8575 1320 586314 : case TSS_BUILDRUNS:
8397 bruce 1321 ECB :
1322 : /*
1323 : * Save the tuple into the unsorted array (there must be space)
1324 : */
2018 rhaas 1325 GIC 586314 : state->memtuples[state->memtupcount++] = *tuple;
8397 bruce 1326 ECB :
1327 : /*
2018 rhaas 1328 : * If we are over the memory limit, dump all tuples.
8575 tgl 1329 : */
8575 tgl 1330 CBC 586314 : dumptuples(state, false);
8575 tgl 1331 GIC 586314 : break;
1332 :
8575 tgl 1333 UIC 0 : default:
7198 tgl 1334 LBC 0 : elog(ERROR, "invalid tuplesort state");
8575 tgl 1335 ECB : break;
1336 : }
256 akorotkov 1337 GNC 2454383 : MemoryContextSwitchTo(oldcontext);
1338 : }
1339 :
1340 : static bool
3002 rhaas 1341 GIC 2367567 : consider_abort_common(Tuplesortstate *state)
1342 : {
256 akorotkov 1343 GNC 2367567 : Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1344 2367567 : Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1345 2367567 : Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
3002 rhaas 1346 ECB :
1347 : /*
1348 : * Check effectiveness of abbreviation optimization. Consider aborting
1349 : * when still within memory limit.
1350 : */
3002 rhaas 1351 GIC 2367567 : if (state->status == TSS_INITIAL &&
3002 rhaas 1352 CBC 2100047 : state->memtupcount >= state->abbrevNext)
1353 : {
3002 rhaas 1354 GIC 3163 : state->abbrevNext *= 2;
1355 :
1356 : /*
1357 : * Check opclass-supplied abbreviation abort routine. It may indicate
2878 bruce 1358 ECB : * that abbreviation should not proceed.
3002 rhaas 1359 EUB : */
256 akorotkov 1360 GNC 3163 : if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1361 : state->base.sortKeys))
3002 rhaas 1362 GIC 3086 : return false;
1363 :
1364 : /*
3002 rhaas 1365 ECB : * Finally, restore authoritative comparator, and indicate that
1366 : * abbreviation is not in play by setting abbrev_converter to NULL
1367 : */
256 akorotkov 1368 GNC 77 : state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1369 77 : state->base.sortKeys[0].abbrev_converter = NULL;
1370 : /* Not strictly necessary, but be tidy */
1371 77 : state->base.sortKeys[0].abbrev_abort = NULL;
1372 77 : state->base.sortKeys[0].abbrev_full_comparator = NULL;
1373 :
3002 rhaas 1374 ECB : /* Give up - expect original pass-by-value representation */
3002 rhaas 1375 CBC 77 : return true;
3002 rhaas 1376 ECB : }
1377 :
3002 rhaas 1378 GIC 2364404 : return false;
3002 rhaas 1379 ECB : }
1380 :
8575 tgl 1381 : /*
1382 : * All tuples have been provided; finish the sort.
1383 : */
1384 : void
8575 tgl 1385 GIC 134315 : tuplesort_performsort(Tuplesortstate *state)
1386 : {
256 akorotkov 1387 GNC 134315 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
6251 tgl 1388 ECB :
1389 : #ifdef TRACE_SORT
6397 tgl 1390 GIC 134315 : if (trace_sort)
1620 pg 1391 LBC 0 : elog(LOG, "performsort of worker %d starting: %s",
1392 : state->worker, pg_rusage_show(&state->ru_start));
1393 : #endif
1394 :
8575 tgl 1395 GIC 134315 : switch (state->status)
1396 : {
7836 bruce 1397 CBC 134057 : case TSS_INITIAL:
8397 bruce 1398 ECB :
1399 : /*
1400 : * We were able to accumulate all the tuples within the allowed
1401 : * amount of memory, or leader to take over worker tapes
1402 : */
1892 rhaas 1403 GIC 134057 : if (SERIAL(state))
1892 rhaas 1404 ECB : {
1405 : /* Just qsort 'em and we're done */
1892 rhaas 1406 CBC 133781 : tuplesort_sort_memtuples(state);
1892 rhaas 1407 GIC 133739 : state->status = TSS_SORTEDINMEM;
1892 rhaas 1408 ECB : }
1892 rhaas 1409 CBC 276 : else if (WORKER(state))
1410 : {
1411 : /*
1892 rhaas 1412 ECB : * Parallel workers must still dump out tuples to tape. No
1413 : * merge is required to produce single output run, though.
1414 : */
1892 rhaas 1415 GIC 205 : inittapes(state, false);
1416 205 : dumptuples(state, true);
1417 205 : worker_nomergeruns(state);
1418 205 : state->status = TSS_SORTEDONTAPE;
1419 : }
1892 rhaas 1420 ECB : else
1421 : {
1422 : /*
1423 : * Leader will take over worker tapes and merge worker runs.
1424 : * Note that mergeruns sets the correct state->status.
1425 : */
1892 rhaas 1426 GIC 71 : leader_takeover_tapes(state);
1427 71 : mergeruns(state);
1428 : }
8575 tgl 1429 CBC 134015 : state->current = 0;
8575 tgl 1430 GIC 134015 : state->eof_reached = false;
1892 rhaas 1431 CBC 134015 : state->markpos_block = 0L;
8575 tgl 1432 GIC 134015 : state->markpos_offset = 0;
1433 134015 : state->markpos_eof = false;
1434 134015 : break;
1435 :
5819 1436 200 : case TSS_BOUNDED:
1437 :
5819 tgl 1438 ECB : /*
1439 : * We were able to accumulate all the tuples required for output
1440 : * in memory, using a heap to eliminate excess tuples. Now we
1441 : * have to transform the heap to a properly-sorted array.
1442 : * Note that sort_bounded_heap sets the correct state->status.
1443 : */
5699 tgl 1444 CBC 200 : sort_bounded_heap(state);
5819 1445 200 : state->current = 0;
1446 200 : state->eof_reached = false;
5819 tgl 1447 GIC 200 : state->markpos_offset = 0;
1448 200 : state->markpos_eof = false;
5819 tgl 1449 CBC 200 : break;
1450 :
8575 tgl 1451 GIC 58 : case TSS_BUILDRUNS:
1452 :
1453 : /*
1454 : * Finish tape-based sort. First, flush all tuples remaining in
6385 bruce 1455 ECB : * memory out to tape; then merge until we have a single remaining
1456 : * run (or, if !randomAccess and !WORKER(), one run per tape).
1892 rhaas 1457 : * Note that mergeruns sets the correct state->status.
8575 tgl 1458 : */
8575 tgl 1459 GIC 58 : dumptuples(state, true);
1460 58 : mergeruns(state);
1461 58 : state->eof_reached = false;
8575 tgl 1462 CBC 58 : state->markpos_block = 0L;
1463 58 : state->markpos_offset = 0;
8575 tgl 1464 GIC 58 : state->markpos_eof = false;
1465 58 : break;
1466 :
8575 tgl 1467 UIC 0 : default:
7198 1468 0 : elog(ERROR, "invalid tuplesort state");
1469 : break;
1470 : }
6397 tgl 1471 ECB :
1472 : #ifdef TRACE_SORT
6397 tgl 1473 GIC 134273 : if (trace_sort)
1474 : {
6241 tgl 1475 UIC 0 : if (state->status == TSS_FINALMERGE)
1620 pg 1476 0 : elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1477 : state->worker, state->nInputTapes,
1478 : pg_rusage_show(&state->ru_start));
1479 : else
1620 pg 1480 LBC 0 : elog(LOG, "performsort of worker %d done: %s",
1481 : state->worker, pg_rusage_show(&state->ru_start));
1482 : }
1483 : #endif
1484 :
6251 tgl 1485 GIC 134273 : MemoryContextSwitchTo(oldcontext);
8575 tgl 1486 CBC 134273 : }
1487 :
8575 tgl 1488 ECB : /*
1489 : * Internal routine to fetch the next tuple in either forward or back
2062 peter_e 1490 : * direction into *stup. Returns false if no more tuples.
1491 : * Returned tuple belongs to tuplesort memory context, and must not be freed
1492 : * by caller. Note that fetched tuple is stored in memory that may be
1493 : * recycled by any future fetch.
8575 tgl 1494 : */
1495 : bool
6251 tgl 1496 CBC 17912524 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
2309 rhaas 1497 ECB : SortTuple *stup)
1498 : {
1499 : unsigned int tuplen;
2299 heikki.linnakangas 1500 : size_t nmoved;
1501 :
1892 rhaas 1502 GIC 17912524 : Assert(!WORKER(state));
1503 :
8575 tgl 1504 17912524 : switch (state->status)
1505 : {
1506 15498267 : case TSS_SORTEDINMEM:
256 akorotkov 1507 GNC 15498267 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
2379 heikki.linnakangas 1508 CBC 15498267 : Assert(!state->slabAllocatorUsed);
8575 tgl 1509 GIC 15498267 : if (forward)
1510 : {
1511 15498234 : if (state->current < state->memtupcount)
1512 : {
6251 tgl 1513 CBC 15365297 : *stup = state->memtuples[state->current++];
1514 15365297 : return true;
6251 tgl 1515 ECB : }
8575 tgl 1516 GIC 132937 : state->eof_reached = true;
5819 tgl 1517 ECB :
1518 : /*
1519 : * Complain if caller tries to retrieve more tuples than
1520 : * originally asked for in a bounded sort. This is because
1521 : * returning EOF here might be the wrong thing.
1522 : */
5819 tgl 1523 GIC 132937 : if (state->bounded && state->current >= state->bound)
5819 tgl 1524 UIC 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1525 :
6251 tgl 1526 GIC 132937 : return false;
8575 tgl 1527 ECB : }
1528 : else
1529 : {
8575 tgl 1530 GIC 33 : if (state->current <= 0)
6251 tgl 1531 UIC 0 : return false;
1532 :
1533 : /*
1534 : * if all tuples are fetched already then we return last
1535 : * tuple, else - tuple before last returned.
8575 tgl 1536 ECB : */
8575 tgl 1537 CBC 33 : if (state->eof_reached)
1538 6 : state->eof_reached = false;
1539 : else
1540 : {
8397 bruce 1541 GIC 27 : state->current--; /* last returned tuple */
8575 tgl 1542 27 : if (state->current <= 0)
6251 1543 3 : return false;
1544 : }
1545 30 : *stup = state->memtuples[state->current - 1];
1546 30 : return true;
1547 : }
8575 tgl 1548 ECB : break;
8575 tgl 1549 EUB :
8575 tgl 1550 GIC 136497 : case TSS_SORTEDONTAPE:
256 akorotkov 1551 GNC 136497 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
2379 heikki.linnakangas 1552 GIC 136497 : Assert(state->slabAllocatorUsed);
1553 :
1554 : /*
1555 : * The slot that held the tuple that we returned in previous
1556 : * gettuple call can now be reused.
2379 heikki.linnakangas 1557 ECB : */
2379 heikki.linnakangas 1558 GBC 136497 : if (state->lastReturnedTuple)
1559 : {
2379 heikki.linnakangas 1560 GIC 76425 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1561 76425 : state->lastReturnedTuple = NULL;
1562 : }
2379 heikki.linnakangas 1563 ECB :
8575 tgl 1564 CBC 136497 : if (forward)
1565 : {
1566 136482 : if (state->eof_reached)
6251 tgl 1567 UIC 0 : return false;
1568 :
538 heikki.linnakangas 1569 CBC 136482 : if ((tuplen = getlen(state->result_tape, true)) != 0)
8575 tgl 1570 EUB : {
6251 tgl 1571 GIC 136470 : READTUP(state, stup, state->result_tape, tuplen);
1572 :
1573 : /*
1574 : * Remember the tuple we return, so that we can recycle
1575 : * its memory on next call. (This can be NULL, in the
1576 : * !state->tuples case).
1577 : */
2379 heikki.linnakangas 1578 136470 : state->lastReturnedTuple = stup->tuple;
2379 heikki.linnakangas 1579 ECB :
6251 tgl 1580 GIC 136470 : return true;
1581 : }
8575 tgl 1582 ECB : else
8575 tgl 1583 EUB : {
8575 tgl 1584 GIC 12 : state->eof_reached = true;
6251 1585 12 : return false;
1586 : }
1587 : }
8397 bruce 1588 ECB :
1589 : /*
1590 : * Backward.
8575 tgl 1591 : *
1592 : * if all tuples are fetched already then we return last tuple,
6347 bruce 1593 : * else - tuple before last returned.
1594 : */
8575 tgl 1595 GIC 15 : if (state->eof_reached)
1596 : {
1597 : /*
1598 : * Seek position is pointing just past the zero tuplen at the
6385 bruce 1599 ECB : * end of file; back up to fetch last tuple's ending length
1600 : * word. If seek fails we must have a completely empty file.
1601 : */
538 heikki.linnakangas 1602 GIC 6 : nmoved = LogicalTapeBackspace(state->result_tape,
2299 heikki.linnakangas 1603 ECB : 2 * sizeof(unsigned int));
2299 heikki.linnakangas 1604 GIC 6 : if (nmoved == 0)
6251 tgl 1605 UIC 0 : return false;
2299 heikki.linnakangas 1606 GIC 6 : else if (nmoved != 2 * sizeof(unsigned int))
2299 heikki.linnakangas 1607 UIC 0 : elog(ERROR, "unexpected tape position");
8575 tgl 1608 GIC 6 : state->eof_reached = false;
1609 : }
1610 : else
1611 : {
8575 tgl 1612 ECB : /*
1613 : * Back up and fetch previously-returned tuple's ending length
6385 bruce 1614 : * word. If seek fails, assume we are at start of file.
1615 : */
538 heikki.linnakangas 1616 GIC 9 : nmoved = LogicalTapeBackspace(state->result_tape,
2299 heikki.linnakangas 1617 ECB : sizeof(unsigned int));
2299 heikki.linnakangas 1618 GBC 9 : if (nmoved == 0)
6251 tgl 1619 UIC 0 : return false;
2299 heikki.linnakangas 1620 GIC 9 : else if (nmoved != sizeof(unsigned int))
2299 heikki.linnakangas 1621 UIC 0 : elog(ERROR, "unexpected tape position");
538 heikki.linnakangas 1622 GIC 9 : tuplen = getlen(state->result_tape, false);
1623 :
1624 : /*
1625 : * Back up to get ending length word of tuple before it.
1626 : */
1627 9 : nmoved = LogicalTapeBackspace(state->result_tape,
2118 tgl 1628 ECB : tuplen + 2 * sizeof(unsigned int));
2299 heikki.linnakangas 1629 GIC 9 : if (nmoved == tuplen + sizeof(unsigned int))
8575 tgl 1630 ECB : {
1631 : /*
2299 heikki.linnakangas 1632 : * We backed up over the previous tuple, but there was no
1633 : * ending length word before it. That means that the prev
1634 : * tuple is the first tuple in the file. It is now the
1635 : * next to read in forward direction (not obviously right,
1636 : * but that is what in-memory case does).
8575 tgl 1637 : */
6251 tgl 1638 CBC 3 : return false;
8575 tgl 1639 ECB : }
2299 heikki.linnakangas 1640 CBC 6 : else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2299 heikki.linnakangas 1641 LBC 0 : elog(ERROR, "bogus tuple length in backward scan");
8575 tgl 1642 ECB : }
1643 :
538 heikki.linnakangas 1644 CBC 12 : tuplen = getlen(state->result_tape, false);
8397 bruce 1645 ECB :
8575 tgl 1646 : /*
6385 bruce 1647 : * Now we have the length of the prior tuple, back up and read it.
1648 : * Note: READTUP expects we are positioned after the initial
6385 bruce 1649 EUB : * length word of the tuple, so back up to that point.
8575 tgl 1650 : */
538 heikki.linnakangas 1651 GIC 12 : nmoved = LogicalTapeBackspace(state->result_tape,
1652 : tuplen);
2299 1653 12 : if (nmoved != tuplen)
7198 tgl 1654 LBC 0 : elog(ERROR, "bogus tuple length in backward scan");
6251 tgl 1655 CBC 12 : READTUP(state, stup, state->result_tape, tuplen);
1656 :
1657 : /*
1658 : * Remember the tuple we return, so that we can recycle its memory
1659 : * on next call. (This can be NULL, in the Datum case).
1660 : */
2379 heikki.linnakangas 1661 12 : state->lastReturnedTuple = stup->tuple;
1662 :
6251 tgl 1663 12 : return true;
1664 :
8575 1665 2277760 : case TSS_FINALMERGE:
8575 tgl 1666 GIC 2277760 : Assert(forward);
2379 heikki.linnakangas 1667 ECB : /* We are managing memory ourselves, with the slab allocator. */
2379 heikki.linnakangas 1668 GIC 2277760 : Assert(state->slabAllocatorUsed);
8397 bruce 1669 ECB :
2379 heikki.linnakangas 1670 : /*
1671 : * The slab slot holding the tuple that we returned in previous
1672 : * gettuple call can now be reused.
1673 : */
2379 heikki.linnakangas 1674 CBC 2277760 : if (state->lastReturnedTuple)
1675 : {
2379 heikki.linnakangas 1676 GIC 2247626 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2379 heikki.linnakangas 1677 CBC 2247626 : state->lastReturnedTuple = NULL;
2379 heikki.linnakangas 1678 ECB : }
2379 heikki.linnakangas 1679 EUB :
8575 tgl 1680 : /*
1681 : * This code should match the inner loop of mergeonerun().
1682 : */
8562 tgl 1683 GIC 2277760 : if (state->memtupcount > 0)
8575 tgl 1684 ECB : {
538 heikki.linnakangas 1685 CBC 2277650 : int srcTapeIndex = state->memtuples[0].srctape;
538 heikki.linnakangas 1686 GIC 2277650 : LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1687 : SortTuple newtup;
1688 :
2379 1689 2277650 : *stup = state->memtuples[0];
1690 :
1691 : /*
2379 heikki.linnakangas 1692 ECB : * Remember the tuple we return, so that we can recycle its
1693 : * memory on next call. (This can be NULL, in the Datum case).
2579 rhaas 1694 : */
2379 heikki.linnakangas 1695 GIC 2277650 : state->lastReturnedTuple = stup->tuple;
2379 heikki.linnakangas 1696 ECB :
1697 : /*
1698 : * Pull next tuple from tape, and replace the returned tuple
1699 : * at top of the heap with it.
1700 : */
2379 heikki.linnakangas 1701 CBC 2277650 : if (!mergereadnext(state, srcTape, &newtup))
8575 tgl 1702 ECB : {
8397 bruce 1703 : /*
2379 heikki.linnakangas 1704 : * If no more data, we've reached end of run on this tape.
1705 : * Remove the top node from the heap.
1706 : */
2018 rhaas 1707 GIC 151 : tuplesort_heap_delete_top(state);
538 heikki.linnakangas 1708 CBC 151 : state->nInputRuns--;
8397 bruce 1709 ECB :
256 akorotkov 1710 EUB : /*
1711 : * Close the tape. It'd go away at the end of the sort
1712 : * anyway, but better to release the memory early.
1713 : */
256 akorotkov 1714 GIC 151 : LogicalTapeClose(srcTape);
256 akorotkov 1715 CBC 151 : return true;
256 akorotkov 1716 ECB : }
256 akorotkov 1717 GIC 2277499 : newtup.srctape = srcTapeIndex;
1718 2277499 : tuplesort_heap_replace_top(state, &newtup);
1719 2277499 : return true;
1720 : }
1721 110 : return false;
1722 :
256 akorotkov 1723 UIC 0 : default:
1724 0 : elog(ERROR, "invalid tuplesort state");
256 akorotkov 1725 ECB : return false; /* keep compiler quiet */
1726 : }
1727 : }
1728 :
1729 :
1730 : /*
1731 : * Advance over N tuples in either forward or back direction,
1732 : * without returning any data. N==0 is a no-op.
1733 : * Returns true if successful, false if ran out of tuples.
1734 : */
1735 : bool
3394 tgl 1736 CBC 196 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1737 : {
3393 tgl 1738 ECB : MemoryContext oldcontext;
1739 :
1740 : /*
3394 1741 : * We don't actually support backwards skip yet, because no callers need
3260 bruce 1742 : * it. The API is designed to allow for that later, though.
1743 : */
3394 tgl 1744 CBC 196 : Assert(forward);
3394 tgl 1745 GIC 196 : Assert(ntuples >= 0);
1892 rhaas 1746 196 : Assert(!WORKER(state));
1747 :
3394 tgl 1748 196 : switch (state->status)
1749 : {
3394 tgl 1750 CBC 184 : case TSS_SORTEDINMEM:
3394 tgl 1751 GIC 184 : if (state->memtupcount - state->current >= ntuples)
3394 tgl 1752 ECB : {
3394 tgl 1753 CBC 184 : state->current += ntuples;
1754 184 : return true;
1755 : }
3394 tgl 1756 UIC 0 : state->current = state->memtupcount;
1757 0 : state->eof_reached = true;
1758 :
1759 : /*
1760 : * Complain if caller tries to retrieve more tuples than
1761 : * originally asked for in a bounded sort. This is because
3394 tgl 1762 ECB : * returning EOF here might be the wrong thing.
1763 : */
3394 tgl 1764 LBC 0 : if (state->bounded && state->current >= state->bound)
3394 tgl 1765 UIC 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
3394 tgl 1766 ECB :
3394 tgl 1767 UIC 0 : return false;
1768 :
3394 tgl 1769 GIC 12 : case TSS_SORTEDONTAPE:
1770 : case TSS_FINALMERGE:
1771 :
3394 tgl 1772 ECB : /*
1773 : * We could probably optimize these cases better, but for now it's
1774 : * not worth the trouble.
1775 : */
256 akorotkov 1776 GNC 12 : oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
3394 tgl 1777 CBC 120066 : while (ntuples-- > 0)
1778 : {
3394 tgl 1779 ECB : SortTuple stup;
1780 :
2309 rhaas 1781 GIC 120054 : if (!tuplesort_gettuple_common(state, forward, &stup))
3393 tgl 1782 ECB : {
3393 tgl 1783 UIC 0 : MemoryContextSwitchTo(oldcontext);
3394 tgl 1784 LBC 0 : return false;
3393 tgl 1785 ECB : }
3394 tgl 1786 GIC 120054 : CHECK_FOR_INTERRUPTS();
3394 tgl 1787 ECB : }
3393 tgl 1788 GIC 12 : MemoryContextSwitchTo(oldcontext);
3394 1789 12 : return true;
3394 tgl 1790 ECB :
3394 tgl 1791 UIC 0 : default:
3394 tgl 1792 LBC 0 : elog(ERROR, "invalid tuplesort state");
3394 tgl 1793 ECB : return false; /* keep compiler quiet */
1794 : }
1795 : }
1796 :
1797 : /*
1798 : * tuplesort_merge_order - report merge order we'll use for given memory
1799 : * (note: "merge order" just means the number of input tapes in the merge).
6258 1800 : *
1801 : * This is exported for use by the planner. allowedMem is in bytes.
1802 : */
1803 : int
3566 noah 1804 GIC 7737 : tuplesort_merge_order(int64 allowedMem)
1805 : {
1806 : int mOrder;
6258 tgl 1807 ECB :
538 heikki.linnakangas 1808 : /*----------
1809 : * In the merge phase, we need buffer space for each input and output tape.
1810 : * Each pass in the balanced merge algorithm reads from M input tapes, and
1811 : * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1812 : * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1813 : * input tape.
1814 : *
1815 : * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1816 : * N * TAPE_BUFFER_OVERHEAD
1817 : *
1818 : * Except for the last and next-to-last merge passes, where there can be
1819 : * fewer tapes left to process, M = N. We choose M so that we have the
1820 : * desired amount of memory available for the input buffers
1821 : * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1822 : * available for the tape buffers (allowedMem).
1823 : *
1824 : * Note: you might be thinking we need to account for the memtuples[]
6031 bruce 1825 : * array in this calculation, but we effectively treat that as part of the
1826 : * MERGE_BUFFER_SIZE workspace.
1827 : *----------
1828 : */
538 heikki.linnakangas 1829 GIC 7737 : mOrder = allowedMem /
538 heikki.linnakangas 1830 ECB : (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
6258 tgl 1831 :
1832 : /*
2336 rhaas 1833 : * Even in minimum memory, use at least a MINORDER merge. On the other
1834 : * hand, even when we have lots of memory, do not use more than a MAXORDER
1835 : * merge. Tapes are pretty cheap, but they're not entirely free. Each
1836 : * additional tape reduces the amount of memory available to build runs,
1837 : * which in turn can cause the same sort to need more runs, which makes
1838 : * merging slower even if it can still be done in a single pass. Also,
1839 : * high order merges are quite slow due to CPU cache effects; it can be
538 heikki.linnakangas 1840 : * faster to pay the I/O cost of a multi-pass merge than to perform a
1841 : * single merge pass across many hundreds of tapes.
2336 rhaas 1842 : */
6251 tgl 1843 GIC 7737 : mOrder = Max(mOrder, MINORDER);
2336 rhaas 1844 CBC 7737 : mOrder = Min(mOrder, MAXORDER);
6258 tgl 1845 ECB :
6251 tgl 1846 CBC 7737 : return mOrder;
6258 tgl 1847 ECB : }
1848 :
538 heikki.linnakangas 1849 : /*
1850 : * Helper function to calculate how much memory to allocate for the read buffer
1851 : * of each input tape in a merge pass.
1852 : *
1853 : * 'avail_mem' is the amount of memory available for the buffers of all the
1854 : * tapes, both input and output.
1855 : * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1856 : * 'maxOutputTapes' is the max. number of output tapes we should produce.
1857 : */
1858 : static int64
538 heikki.linnakangas 1859 GIC 177 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
538 heikki.linnakangas 1860 ECB : int maxOutputTapes)
1861 : {
1862 : int nOutputRuns;
1863 : int nOutputTapes;
1864 :
1865 : /*
1866 : * How many output tapes will we produce in this pass?
1867 : *
1868 : * This is nInputRuns / nInputTapes, rounded up.
1869 : */
538 heikki.linnakangas 1870 GIC 177 : nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1871 :
538 heikki.linnakangas 1872 CBC 177 : nOutputTapes = Min(nOutputRuns, maxOutputTapes);
538 heikki.linnakangas 1873 ECB :
1874 : /*
1875 : * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1876 : * remaining memory is divided evenly between the input tapes.
1877 : *
1878 : * This also follows from the formula in tuplesort_merge_order, but here
1879 : * we derive the input buffer size from the amount of memory available,
1880 : * and M and N.
1881 : */
538 heikki.linnakangas 1882 GIC 177 : return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1883 : }
538 heikki.linnakangas 1884 ECB :
1885 : /*
8575 tgl 1886 : * inittapes - initialize for tape sorting.
1887 : *
1888 : * This is called only if we have found we won't sort in memory.
1889 : */
1890 : static void
1892 rhaas 1891 GIC 263 : inittapes(Tuplesortstate *state, bool mergeruns)
8575 tgl 1892 ECB : {
1892 rhaas 1893 GIC 263 : Assert(!LEADER(state));
1894 :
1895 263 : if (mergeruns)
1896 : {
1897 : /* Compute number of input tapes to use when merging */
538 heikki.linnakangas 1898 58 : state->maxTapes = tuplesort_merge_order(state->allowedMem);
1892 rhaas 1899 ECB : }
1900 : else
1901 : {
1902 : /* Workers can sometimes produce single run, output without merge */
1892 rhaas 1903 CBC 205 : Assert(WORKER(state));
538 heikki.linnakangas 1904 GIC 205 : state->maxTapes = MINORDER;
1892 rhaas 1905 ECB : }
6258 tgl 1906 :
6397 1907 : #ifdef TRACE_SORT
6397 tgl 1908 CBC 263 : if (trace_sort)
1620 pg 1909 LBC 0 : elog(LOG, "worker %d switching to external sort with %d tapes: %s",
538 heikki.linnakangas 1910 ECB : state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
6397 tgl 1911 : #endif
1912 :
538 heikki.linnakangas 1913 : /* Create the tape set */
538 heikki.linnakangas 1914 GIC 263 : inittapestate(state, state->maxTapes);
1892 rhaas 1915 CBC 263 : state->tapeset =
538 heikki.linnakangas 1916 263 : LogicalTapeSetCreate(false,
1892 rhaas 1917 GIC 263 : state->shared ? &state->shared->fileset : NULL,
1918 : state->worker);
1919 :
2018 1920 263 : state->currentRun = 0;
1921 :
1922 : /*
1923 : * Initialize logical tape arrays.
8575 tgl 1924 ECB : */
538 heikki.linnakangas 1925 GIC 263 : state->inputTapes = NULL;
538 heikki.linnakangas 1926 CBC 263 : state->nInputTapes = 0;
538 heikki.linnakangas 1927 GIC 263 : state->nInputRuns = 0;
1928 :
538 heikki.linnakangas 1929 CBC 263 : state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
538 heikki.linnakangas 1930 GIC 263 : state->nOutputTapes = 0;
538 heikki.linnakangas 1931 CBC 263 : state->nOutputRuns = 0;
8575 tgl 1932 ECB :
8575 tgl 1933 GIC 263 : state->status = TSS_BUILDRUNS;
538 heikki.linnakangas 1934 ECB :
538 heikki.linnakangas 1935 GIC 263 : selectnewtape(state);
8575 tgl 1936 263 : }
1937 :
1938 : /*
1939 : * inittapestate - initialize generic tape management state
1940 : */
1941 : static void
1892 rhaas 1942 CBC 334 : inittapestate(Tuplesortstate *state, int maxTapes)
1943 : {
1944 : int64 tapeSpace;
1945 :
1892 rhaas 1946 ECB : /*
1947 : * Decrease availMem to reflect the space needed for tape buffers; but
1892 rhaas 1948 EUB : * don't decrease it to the point that we have no room for tuples. (That
1892 rhaas 1949 ECB : * case is only likely to occur if sorting pass-by-value Datums; in all
1892 rhaas 1950 EUB : * other scenarios the memtuples[] array is unlikely to occupy more than
1892 rhaas 1951 ECB : * half of allowedMem. In the pass-by-value case it's not important to
1952 : * account for tuple space, so we don't care if LACKMEM becomes
1953 : * inaccurate.)
1954 : */
1892 rhaas 1955 CBC 334 : tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1956 :
1957 334 : if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1892 rhaas 1958 GIC 301 : USEMEM(state, tapeSpace);
1892 rhaas 1959 ECB :
1960 : /*
1961 : * Make sure that the temp file(s) underlying the tape set are created in
1962 : * suitable temp tablespaces. For parallel sorts, this should have been
1963 : * called already, but it doesn't matter if it is called a second time.
1964 : */
1892 rhaas 1965 GIC 334 : PrepareTempTablespaces();
1966 334 : }
1967 :
1968 : /*
538 heikki.linnakangas 1969 ECB : * selectnewtape -- select next tape to output to.
1970 : *
1971 : * This is called after finishing a run when we know another run
1972 : * must be started. This is used both when building the initial
1973 : * runs, and during merge passes.
1974 : */
1975 : static void
8575 tgl 1976 GIC 1825 : selectnewtape(Tuplesortstate *state)
8575 tgl 1977 ECB : {
1978 : /*
531 heikki.linnakangas 1979 : * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
531 heikki.linnakangas 1980 EUB : * both zero. On each call, we create a new output tape to hold the next
1981 : * run, until maxTapes is reached. After that, we assign new runs to the
1982 : * existing tapes in a round robin fashion.
531 heikki.linnakangas 1983 ECB : */
531 heikki.linnakangas 1984 GIC 1825 : if (state->nOutputTapes < state->maxTapes)
8575 tgl 1985 ECB : {
1986 : /* Create a new tape to hold the next run */
538 heikki.linnakangas 1987 CBC 673 : Assert(state->outputTapes[state->nOutputRuns] == NULL);
538 heikki.linnakangas 1988 GIC 673 : Assert(state->nOutputRuns == state->nOutputTapes);
1989 673 : state->destTape = LogicalTapeCreate(state->tapeset);
531 1990 673 : state->outputTapes[state->nOutputTapes] = state->destTape;
538 1991 673 : state->nOutputTapes++;
1992 673 : state->nOutputRuns++;
1993 : }
1994 : else
1995 : {
1996 : /*
1997 : * We have reached the max number of tapes. Append to an existing
1998 : * tape.
1999 : */
2000 1152 : state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
2001 1152 : state->nOutputRuns++;
2002 : }
8575 tgl 2003 CBC 1825 : }
2004 :
2005 : /*
2006 : * Initialize the slab allocation arena, for the given number of slots.
2379 heikki.linnakangas 2007 ECB : */
2008 : static void
2379 heikki.linnakangas 2009 GIC 129 : init_slab_allocator(Tuplesortstate *state, int numSlots)
2379 heikki.linnakangas 2010 ECB : {
2379 heikki.linnakangas 2011 CBC 129 : if (numSlots > 0)
2012 : {
2379 heikki.linnakangas 2013 ECB : char *p;
2014 : int i;
2015 :
2379 heikki.linnakangas 2016 GIC 123 : state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2017 123 : state->slabMemoryEnd = state->slabMemoryBegin +
2018 123 : numSlots * SLAB_SLOT_SIZE;
2019 123 : state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2020 123 : USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2021 :
2022 123 : p = state->slabMemoryBegin;
2023 510 : for (i = 0; i < numSlots - 1; i++)
2379 heikki.linnakangas 2024 ECB : {
2379 heikki.linnakangas 2025 GIC 387 : ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2026 387 : p += SLAB_SLOT_SIZE;
2027 : }
2379 heikki.linnakangas 2028 CBC 123 : ((SlabSlot *) p)->nextfree = NULL;
2029 : }
2379 heikki.linnakangas 2030 ECB : else
2031 : {
2379 heikki.linnakangas 2032 CBC 6 : state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2033 6 : state->slabFreeHead = NULL;
2379 heikki.linnakangas 2034 ECB : }
2379 heikki.linnakangas 2035 CBC 129 : state->slabAllocatorUsed = true;
2379 heikki.linnakangas 2036 GIC 129 : }
2379 heikki.linnakangas 2037 ECB :
2038 : /*
8575 tgl 2039 : * mergeruns -- merge all the completed initial runs.
2040 : *
2041 : * This implements the Balanced k-Way Merge Algorithm. All input data has
2042 : * already been written to initial runs on tape (see dumptuples).
2043 : */
2044 : static void
8575 tgl 2045 GIC 129 : mergeruns(Tuplesortstate *state)
2046 : {
538 heikki.linnakangas 2047 ECB : int tapenum;
2048 :
8575 tgl 2049 GIC 129 : Assert(state->status == TSS_BUILDRUNS);
8562 tgl 2050 CBC 129 : Assert(state->memtupcount == 0);
8397 bruce 2051 ECB :
256 akorotkov 2052 GNC 129 : if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2053 : {
2054 : /*
2055 : * If there are multiple runs to be merged, when we go to read back
2056 : * tuples from disk, abbreviated keys will not have been stored, and
2057 : * we don't care to regenerate them. Disable abbreviation from this
2058 : * point on.
2059 : */
2060 15 : state->base.sortKeys->abbrev_converter = NULL;
2061 15 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2062 :
2063 : /* Not strictly necessary, but be tidy */
2064 15 : state->base.sortKeys->abbrev_abort = NULL;
2065 15 : state->base.sortKeys->abbrev_full_comparator = NULL;
2066 : }
2998 rhaas 2067 ECB :
2068 : /*
2379 heikki.linnakangas 2069 : * Reset tuple memory. We've freed all the tuples that we previously
2070 : * allocated. We will use the slab allocator from now on.
2071 : */
256 akorotkov 2072 GNC 129 : MemoryContextResetOnly(state->base.tuplecontext);
2073 :
2379 heikki.linnakangas 2074 ECB : /*
2075 : * We no longer need a large memtuples array. (We will allocate a smaller
2076 : * one for the heap later.)
2077 : */
2379 heikki.linnakangas 2078 CBC 129 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2379 heikki.linnakangas 2079 GIC 129 : pfree(state->memtuples);
2080 129 : state->memtuples = NULL;
2081 :
2082 : /*
2083 : * Initialize the slab allocator. We need one slab slot per input tape,
2084 : * for the tuples in the heap, plus one to hold the tuple last returned
2085 : * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2086 : * however, we don't need to do allocate anything.)
2087 : *
2088 : * In a multi-pass merge, we could shrink this allocation for the last
2089 : * merge pass, if it has fewer tapes than previous passes, but we don't
2090 : * bother.
2091 : *
2092 : * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2093 : * to track memory usage of individual tuples.
2094 : */
256 akorotkov 2095 GNC 129 : if (state->base.tuples)
538 heikki.linnakangas 2096 GIC 123 : init_slab_allocator(state, state->nOutputTapes + 1);
2379 heikki.linnakangas 2097 ECB : else
2379 heikki.linnakangas 2098 GIC 6 : init_slab_allocator(state, 0);
2099 :
2379 heikki.linnakangas 2100 ECB : /*
2313 2101 : * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2102 : * from each input tape.
2103 : *
2104 : * We could shrink this, too, between passes in a multi-pass merge, but we
2105 : * don't bother. (The initial input tapes are still in outputTapes. The
2106 : * number of input tapes will not increase between passes.)
2107 : */
538 heikki.linnakangas 2108 CBC 129 : state->memtupsize = state->nOutputTapes;
256 akorotkov 2109 GNC 258 : state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
538 heikki.linnakangas 2110 CBC 129 : state->nOutputTapes * sizeof(SortTuple));
2313 2111 129 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2112 :
2113 : /*
2114 : * Use all the remaining memory we have available for tape buffers among
2115 : * all the input tapes. At the beginning of each merge pass, we will
2116 : * divide this memory between the input and output tapes in the pass.
2379 heikki.linnakangas 2117 ECB : */
538 heikki.linnakangas 2118 GIC 129 : state->tape_buffer_mem = state->availMem;
531 2119 129 : USEMEM(state, state->tape_buffer_mem);
2370 heikki.linnakangas 2120 ECB : #ifdef TRACE_SORT
2370 heikki.linnakangas 2121 CBC 129 : if (trace_sort)
538 heikki.linnakangas 2122 LBC 0 : elog(LOG, "worker %d using %zu KB of memory for tape buffers",
538 heikki.linnakangas 2123 ECB : state->worker, state->tape_buffer_mem / 1024);
2370 2124 : #endif
2125 :
2126 : for (;;)
2127 : {
2128 : /*
2129 : * On the first iteration, or if we have read all the runs from the
2130 : * input tapes in a multi-pass merge, it's time to start a new pass.
2131 : * Rewind all the output tapes, and make them inputs for the next
2132 : * pass.
6242 tgl 2133 : */
538 heikki.linnakangas 2134 GIC 390 : if (state->nInputRuns == 0)
8575 tgl 2135 ECB : {
538 heikki.linnakangas 2136 : int64 input_buffer_size;
8575 tgl 2137 :
2138 : /* Close the old, emptied, input tapes */
538 heikki.linnakangas 2139 CBC 177 : if (state->nInputTapes > 0)
8575 tgl 2140 ECB : {
538 heikki.linnakangas 2141 CBC 336 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
538 heikki.linnakangas 2142 GIC 288 : LogicalTapeClose(state->inputTapes[tapenum]);
2143 48 : pfree(state->inputTapes);
2144 : }
2145 :
2146 : /* Previous pass's outputs become next pass's inputs. */
2147 177 : state->inputTapes = state->outputTapes;
2148 177 : state->nInputTapes = state->nOutputTapes;
2149 177 : state->nInputRuns = state->nOutputRuns;
2150 :
2151 : /*
2152 : * Reset output tape variables. The actual LogicalTapes will be
2153 : * created as needed, here we only allocate the array to hold
2154 : * them.
538 heikki.linnakangas 2155 ECB : */
538 heikki.linnakangas 2156 GIC 177 : state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
538 heikki.linnakangas 2157 CBC 177 : state->nOutputTapes = 0;
2158 177 : state->nOutputRuns = 0;
2159 :
2160 : /*
2161 : * Redistribute the memory allocated for tape buffers, among the
538 heikki.linnakangas 2162 ECB : * new input and output tapes.
2163 : */
538 heikki.linnakangas 2164 GIC 177 : input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
538 heikki.linnakangas 2165 ECB : state->nInputTapes,
2166 : state->nInputRuns,
2167 : state->maxTapes);
2168 :
2169 : #ifdef TRACE_SORT
538 heikki.linnakangas 2170 GBC 177 : if (trace_sort)
538 heikki.linnakangas 2171 UIC 0 : elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2172 : state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2173 : pg_rusage_show(&state->ru_start));
2174 : #endif
2175 :
2176 : /* Prepare the new input tapes for merge pass. */
538 heikki.linnakangas 2177 CBC 777 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2178 600 : LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2179 :
2180 : /*
2181 : * If there's just one run left on each input tape, then only one
2182 : * merge pass remains. If we don't have to produce a materialized
2183 : * sorted tape, we can stop at this point and do the final merge
538 heikki.linnakangas 2184 ECB : * on-the-fly.
2185 : */
256 akorotkov 2186 GNC 177 : if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
370 drowley 2187 GIC 162 : && state->nInputRuns <= state->nInputTapes
538 heikki.linnakangas 2188 120 : && !WORKER(state))
2189 : {
2190 : /* Tell logtape.c we won't be writing anymore */
6242 tgl 2191 119 : LogicalTapeSetForgetFreeSpace(state->tapeset);
2192 : /* Initialize for the final merge pass */
2379 heikki.linnakangas 2193 119 : beginmerge(state);
8575 tgl 2194 CBC 119 : state->status = TSS_FINALMERGE;
2195 119 : return;
8575 tgl 2196 ECB : }
2197 : }
6242 2198 :
538 heikki.linnakangas 2199 : /* Select an output tape */
538 heikki.linnakangas 2200 CBC 271 : selectnewtape(state);
2201 :
538 heikki.linnakangas 2202 ECB : /* Merge one run from each input tape. */
538 heikki.linnakangas 2203 GIC 271 : mergeonerun(state);
8397 bruce 2204 ECB :
2205 : /*
2206 : * If the input tapes are empty, and we output only one output run,
538 heikki.linnakangas 2207 : * we're done. The current output tape contains the final result.
8397 bruce 2208 : */
538 heikki.linnakangas 2209 GIC 271 : if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2210 10 : break;
2211 : }
2212 :
2213 : /*
538 heikki.linnakangas 2214 ECB : * Done. The result is on a single run on a single tape.
2215 : */
538 heikki.linnakangas 2216 CBC 10 : state->result_tape = state->outputTapes[0];
1892 rhaas 2217 GIC 10 : if (!WORKER(state))
538 heikki.linnakangas 2218 CBC 9 : LogicalTapeFreeze(state->result_tape, NULL);
1892 rhaas 2219 ECB : else
1892 rhaas 2220 CBC 1 : worker_freeze_result_tape(state);
8575 tgl 2221 GIC 10 : state->status = TSS_SORTEDONTAPE;
2379 heikki.linnakangas 2222 ECB :
2223 : /* Close all the now-empty input tapes, to release their read buffers. */
538 heikki.linnakangas 2224 GIC 45 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
538 heikki.linnakangas 2225 CBC 35 : LogicalTapeClose(state->inputTapes[tapenum]);
2226 : }
8575 tgl 2227 ECB :
8575 tgl 2228 EUB : /*
538 heikki.linnakangas 2229 ECB : * Merge one run from each input tape.
8575 tgl 2230 EUB : */
2231 : static void
8575 tgl 2232 CBC 271 : mergeonerun(Tuplesortstate *state)
2233 : {
2234 : int srcTapeIndex;
2235 : LogicalTape *srcTape;
2236 :
8575 tgl 2237 ECB : /*
2238 : * Start the merge by loading one tuple from each active source tape into
538 heikki.linnakangas 2239 : * the heap.
8575 tgl 2240 : */
2379 heikki.linnakangas 2241 GIC 271 : beginmerge(state);
8575 tgl 2242 ECB :
220 drowley 2243 GNC 271 : Assert(state->slabAllocatorUsed);
2244 :
8575 tgl 2245 ECB : /*
6385 bruce 2246 : * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2247 : * out, and replacing it with next tuple from same tape (if there is
2248 : * another one).
8575 tgl 2249 : */
8562 tgl 2250 GIC 1582631 : while (state->memtupcount > 0)
2251 : {
2252 : SortTuple stup;
2253 :
8562 tgl 2254 ECB : /* write the tuple to destTape */
538 heikki.linnakangas 2255 GIC 1582360 : srcTapeIndex = state->memtuples[0].srctape;
538 heikki.linnakangas 2256 CBC 1582360 : srcTape = state->inputTapes[srcTapeIndex];
2257 1582360 : WRITETUP(state, state->destTape, &state->memtuples[0]);
2258 :
2379 heikki.linnakangas 2259 ECB : /* recycle the slot of the tuple we just wrote out, for the next read */
2274 tgl 2260 CBC 1582360 : if (state->memtuples[0].tuple)
2261 1522270 : RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2401 heikki.linnakangas 2262 ECB :
2263 : /*
2379 2264 : * pull next tuple from the tape, and replace the written-out tuple in
2265 : * the heap with it.
2266 : */
2379 heikki.linnakangas 2267 GIC 1582360 : if (mergereadnext(state, srcTape, &stup))
2268 : {
538 2269 1580885 : stup.srctape = srcTapeIndex;
2018 rhaas 2270 1580885 : tuplesort_heap_replace_top(state, &stup);
2271 : }
2272 : else
2273 : {
2274 1475 : tuplesort_heap_delete_top(state);
538 heikki.linnakangas 2275 1475 : state->nInputRuns--;
2276 : }
2277 : }
2278 :
2279 : /*
2280 : * When the heap empties, we're done. Write an end-of-run marker on the
2281 : * output tape.
2282 : */
2283 271 : markrunend(state->destTape);
8575 tgl 2284 271 : }
2285 :
2286 : /*
2287 : * beginmerge - initialize for a merge pass
2288 : *
2289 : * Fill the merge heap with the first tuple from each input tape.
2290 : */
2291 : static void
2379 heikki.linnakangas 2292 390 : beginmerge(Tuplesortstate *state)
2293 : {
2294 : int activeTapes;
2295 : int srcTapeIndex;
2296 :
2297 : /* Heap should be empty here */
8562 tgl 2298 390 : Assert(state->memtupcount == 0);
2299 :
538 heikki.linnakangas 2300 390 : activeTapes = Min(state->nInputTapes, state->nInputRuns);
2301 :
2302 2142 : for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2303 : {
2304 : SortTuple tup;
2305 :
2306 1752 : if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2307 : {
2308 1641 : tup.srctape = srcTapeIndex;
2018 rhaas 2309 1641 : tuplesort_heap_insert(state, &tup);
2310 : }
2311 : }
2579 2312 390 : }
2313 :
2314 : /*
2315 : * mergereadnext - read next tuple from one merge input tape
2316 : *
2317 : * Returns false on EOF.
2318 : */
2319 : static bool
538 heikki.linnakangas 2320 3861762 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
2321 : {
2322 : unsigned int tuplen;
2323 :
2324 : /* read next tuple, if any */
2325 3861762 : if ((tuplen = getlen(srcTape, true)) == 0)
2379 2326 1737 : return false;
2327 3860025 : READTUP(state, stup, srcTape, tuplen);
2328 :
2329 3860025 : return true;
2330 : }
2331 :
2332 : /*
2333 : * dumptuples - remove tuples from memtuples and write initial run to tape
2334 : *
2335 : * When alltuples = true, dump everything currently in memory. (This case is
2336 : * only used at end of input data.)
2337 : */
2338 : static void
8575 tgl 2339 586635 : dumptuples(Tuplesortstate *state, bool alltuples)
2340 : {
2341 : int memtupwrite;
2342 : int i;
2343 :
2344 : /*
2345 : * Nothing to do if we still fit in available memory and have array slots,
2346 : * unless this is the final call during initial run generation.
2347 : */
2018 rhaas 2348 586635 : if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2349 585344 : !alltuples)
2350 585081 : return;
2351 :
2352 : /*
2353 : * Final call might require no sorting, in rare cases where we just so
2354 : * happen to have previously LACKMEM()'d at the point where exactly all
2355 : * remaining tuples are loaded into memory, just before input was
2356 : * exhausted. In general, short final runs are quite possible, but avoid
2357 : * creating a completely empty run. In a worker, though, we must produce
2358 : * at least one tape, even if it's empty.
2359 : */
538 heikki.linnakangas 2360 1554 : if (state->memtupcount == 0 && state->currentRun > 0)
538 heikki.linnakangas 2361 UIC 0 : return;
2362 :
2557 rhaas 2363 GIC 1554 : Assert(state->status == TSS_BUILDRUNS);
2364 :
2365 : /*
2366 : * It seems unlikely that this limit will ever be exceeded, but take no
2367 : * chances
2368 : */
2369 1554 : if (state->currentRun == INT_MAX)
2557 rhaas 2370 UIC 0 : ereport(ERROR,
2371 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2372 : errmsg("cannot have more than %d runs for an external sort",
2373 : INT_MAX)));
2374 :
538 heikki.linnakangas 2375 GIC 1554 : if (state->currentRun > 0)
2376 1291 : selectnewtape(state);
2377 :
2557 rhaas 2378 1554 : state->currentRun++;
2379 :
2380 : #ifdef TRACE_SORT
2381 1554 : if (trace_sort)
1620 pg 2382 UIC 0 : elog(LOG, "worker %d starting quicksort of run %d: %s",
2383 : state->worker, state->currentRun,
2384 : pg_rusage_show(&state->ru_start));
2385 : #endif
2386 :
2387 : /*
2388 : * Sort all tuples accumulated within the allowed amount of memory for
2389 : * this run using quicksort
2390 : */
2557 rhaas 2391 GIC 1554 : tuplesort_sort_memtuples(state);
2392 :
2393 : #ifdef TRACE_SORT
2394 1554 : if (trace_sort)
1620 pg 2395 UIC 0 : elog(LOG, "worker %d finished quicksort of run %d: %s",
2396 : state->worker, state->currentRun,
2397 : pg_rusage_show(&state->ru_start));
2398 : #endif
2399 :
2557 rhaas 2400 GIC 1554 : memtupwrite = state->memtupcount;
2401 2526830 : for (i = 0; i < memtupwrite; i++)
2402 : {
220 drowley 2403 GNC 2525276 : SortTuple *stup = &state->memtuples[i];
2404 :
2405 2525276 : WRITETUP(state, state->destTape, stup);
2406 :
2407 : /*
2408 : * Account for freeing the tuple, but no need to do the actual pfree
2409 : * since the tuplecontext is being reset after the loop.
2410 : */
2411 2525276 : if (stup->tuple != NULL)
2412 2435219 : FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
2413 : }
2414 :
2415 1554 : state->memtupcount = 0;
2416 :
2417 : /*
2418 : * Reset tuple memory. We've freed all of the tuples that we previously
2419 : * allocated. It's important to avoid fragmentation when there is a stark
2420 : * change in the sizes of incoming tuples. Fragmentation due to
2421 : * AllocSetFree's bucketing by size class might be particularly bad if
2422 : * this step wasn't taken.
2423 : */
256 akorotkov 2424 1554 : MemoryContextReset(state->base.tuplecontext);
2425 :
538 heikki.linnakangas 2426 GIC 1554 : markrunend(state->destTape);
2427 :
2428 : #ifdef TRACE_SORT
2557 rhaas 2429 1554 : if (trace_sort)
1620 pg 2430 UIC 0 : elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2431 : state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2432 : pg_rusage_show(&state->ru_start));
2433 : #endif
2434 : }
2435 :
2436 : /*
2437 : * tuplesort_rescan - rewind and replay the scan
2438 : */
2439 : void
8575 tgl 2440 GIC 29 : tuplesort_rescan(Tuplesortstate *state)
2441 : {
256 akorotkov 2442 GNC 29 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2443 :
2444 29 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2445 :
8575 tgl 2446 GIC 29 : switch (state->status)
2447 : {
2448 26 : case TSS_SORTEDINMEM:
2449 26 : state->current = 0;
2450 26 : state->eof_reached = false;
2451 26 : state->markpos_offset = 0;
2452 26 : state->markpos_eof = false;
2453 26 : break;
2454 3 : case TSS_SORTEDONTAPE:
538 heikki.linnakangas 2455 3 : LogicalTapeRewindForRead(state->result_tape, 0);
8575 tgl 2456 3 : state->eof_reached = false;
2457 3 : state->markpos_block = 0L;
2458 3 : state->markpos_offset = 0;
2459 3 : state->markpos_eof = false;
2460 3 : break;
8575 tgl 2461 UIC 0 : default:
7198 2462 0 : elog(ERROR, "invalid tuplesort state");
2463 : break;
2464 : }
2465 :
6251 tgl 2466 GIC 29 : MemoryContextSwitchTo(oldcontext);
8575 2467 29 : }
2468 :
2469 : /*
2470 : * tuplesort_markpos - saves current position in the merged sort file
2471 : */
2472 : void
2473 284485 : tuplesort_markpos(Tuplesortstate *state)
2474 : {
256 akorotkov 2475 GNC 284485 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2476 :
2477 284485 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2478 :
8575 tgl 2479 GIC 284485 : switch (state->status)
2480 : {
2481 280081 : case TSS_SORTEDINMEM:
2482 280081 : state->markpos_offset = state->current;
2483 280081 : state->markpos_eof = state->eof_reached;
2484 280081 : break;
2485 4404 : case TSS_SORTEDONTAPE:
538 heikki.linnakangas 2486 4404 : LogicalTapeTell(state->result_tape,
2487 : &state->markpos_block,
2488 : &state->markpos_offset);
8575 tgl 2489 4404 : state->markpos_eof = state->eof_reached;
2490 4404 : break;
8575 tgl 2491 UIC 0 : default:
7198 2492 0 : elog(ERROR, "invalid tuplesort state");
2493 : break;
2494 : }
2495 :
6251 tgl 2496 GIC 284485 : MemoryContextSwitchTo(oldcontext);
8575 2497 284485 : }
2498 :
2499 : /*
2500 : * tuplesort_restorepos - restores current position in merged sort file to
2501 : * last saved position
2502 : */
2503 : void
2504 14909 : tuplesort_restorepos(Tuplesortstate *state)
2505 : {
256 akorotkov 2506 GNC 14909 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2507 :
2508 14909 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2509 :
8575 tgl 2510 GIC 14909 : switch (state->status)
2511 : {
2512 11813 : case TSS_SORTEDINMEM:
2513 11813 : state->current = state->markpos_offset;
2514 11813 : state->eof_reached = state->markpos_eof;
2515 11813 : break;
2516 3096 : case TSS_SORTEDONTAPE:
538 heikki.linnakangas 2517 3096 : LogicalTapeSeek(state->result_tape,
2518 : state->markpos_block,
2519 : state->markpos_offset);
8575 tgl 2520 3096 : state->eof_reached = state->markpos_eof;
2521 3096 : break;
8575 tgl 2522 UIC 0 : default:
7198 2523 0 : elog(ERROR, "invalid tuplesort state");
2524 : break;
2525 : }
2526 :
6251 tgl 2527 GIC 14909 : MemoryContextSwitchTo(oldcontext);
8575 2528 14909 : }
2529 :
2530 : /*
2531 : * tuplesort_get_stats - extract summary statistics
2532 : *
2533 : * This can be called after tuplesort_performsort() finishes to obtain
2534 : * printable summary information about how the sort was performed.
2535 : */
2536 : void
4990 2537 180 : tuplesort_get_stats(Tuplesortstate *state,
2538 : TuplesortInstrumentation *stats)
2539 : {
2540 : /*
2541 : * Note: it might seem we should provide both memory and disk usage for a
2542 : * disk-based sort. However, the current code doesn't track memory space
2543 : * accurately once we have begun to return tuples to the caller (since we
2544 : * don't account for pfree's the caller is expected to do), so we cannot
2545 : * rely on availMem in a disk sort. This does not seem worth the overhead
2546 : * to fix. Is it worth creating an API for the memory context code to
2547 : * tell us how much is actually used in sortcontext?
2548 : */
1098 tomas.vondra 2549 180 : tuplesort_updatemax(state);
2550 :
2551 180 : if (state->isMaxSpaceDisk)
2049 rhaas 2552 UIC 0 : stats->spaceType = SORT_SPACE_TYPE_DISK;
2553 : else
2049 rhaas 2554 GIC 180 : stats->spaceType = SORT_SPACE_TYPE_MEMORY;
1098 tomas.vondra 2555 180 : stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2556 :
2557 180 : switch (state->maxSpaceStatus)
2558 : {
5819 tgl 2559 180 : case TSS_SORTEDINMEM:
2560 180 : if (state->boundUsed)
2049 rhaas 2561 21 : stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
2562 : else
2563 159 : stats->sortMethod = SORT_TYPE_QUICKSORT;
5819 tgl 2564 180 : break;
5819 tgl 2565 UIC 0 : case TSS_SORTEDONTAPE:
2049 rhaas 2566 0 : stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
5819 tgl 2567 0 : break;
2568 0 : case TSS_FINALMERGE:
2049 rhaas 2569 0 : stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
5819 tgl 2570 0 : break;
2571 0 : default:
2049 rhaas 2572 0 : stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
5819 tgl 2573 0 : break;
2574 : }
5819 tgl 2575 GIC 180 : }
2576 :
2577 : /*
2578 : * Convert TuplesortMethod to a string.
2579 : */
2580 : const char *
2049 rhaas 2581 129 : tuplesort_method_name(TuplesortMethod m)
2582 : {
2583 129 : switch (m)
2584 : {
2049 rhaas 2585 UIC 0 : case SORT_TYPE_STILL_IN_PROGRESS:
2586 0 : return "still in progress";
2049 rhaas 2587 GIC 21 : case SORT_TYPE_TOP_N_HEAPSORT:
2588 21 : return "top-N heapsort";
2589 108 : case SORT_TYPE_QUICKSORT:
2590 108 : return "quicksort";
2049 rhaas 2591 UIC 0 : case SORT_TYPE_EXTERNAL_SORT:
2592 0 : return "external sort";
2593 0 : case SORT_TYPE_EXTERNAL_MERGE:
2594 0 : return "external merge";
2595 : }
2596 :
2597 0 : return "unknown";
2598 : }
2599 :
2600 : /*
2601 : * Convert TuplesortSpaceType to a string.
2602 : */
2603 : const char *
2049 rhaas 2604 GIC 111 : tuplesort_space_type_name(TuplesortSpaceType t)
2605 : {
2606 111 : Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
2607 111 : return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2608 : }
2609 :
2610 :
2611 : /*
2612 : * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2613 : */
2614 :
2615 : /*
2616 : * Convert the existing unordered array of SortTuples to a bounded heap,
2617 : * discarding all but the smallest "state->bound" tuples.
2618 : *
2619 : * When working with a bounded heap, we want to keep the largest entry
2620 : * at the root (array entry zero), instead of the smallest as in the normal
2621 : * sort case. This allows us to discard the largest entry cheaply.
2622 : * Therefore, we temporarily reverse the sort direction.
2623 : */
2624 : static void
5819 tgl 2625 200 : make_bounded_heap(Tuplesortstate *state)
2626 : {
5624 bruce 2627 200 : int tupcount = state->memtupcount;
2628 : int i;
2629 :
5819 tgl 2630 200 : Assert(state->status == TSS_INITIAL);
2631 200 : Assert(state->bounded);
2632 200 : Assert(tupcount >= state->bound);
1892 rhaas 2633 200 : Assert(SERIAL(state));
2634 :
2635 : /* Reverse sort direction so largest entry will be at root */
3075 2636 200 : reversedirection(state);
2637 :
5819 tgl 2638 200 : state->memtupcount = 0; /* make the heap empty */
5624 bruce 2639 18948 : for (i = 0; i < tupcount; i++)
2640 : {
2401 heikki.linnakangas 2641 18748 : if (state->memtupcount < state->bound)
2642 : {
2643 : /* Insert next tuple into heap */
2644 : /* Must copy source tuple to avoid possible overwrite */
5624 bruce 2645 9274 : SortTuple stup = state->memtuples[i];
2646 :
2018 rhaas 2647 9274 : tuplesort_heap_insert(state, &stup);
2648 : }
2649 : else
2650 : {
2651 : /*
2652 : * The heap is full. Replace the largest entry with the new
2653 : * tuple, or just discard it, if it's larger than anything already
2654 : * in the heap.
2655 : */
2401 heikki.linnakangas 2656 9474 : if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2657 : {
2658 4884 : free_sort_tuple(state, &state->memtuples[i]);
2659 4884 : CHECK_FOR_INTERRUPTS();
2660 : }
2661 : else
2018 rhaas 2662 4590 : tuplesort_heap_replace_top(state, &state->memtuples[i]);
2663 : }
2664 : }
2665 :
5819 tgl 2666 200 : Assert(state->memtupcount == state->bound);
2667 200 : state->status = TSS_BOUNDED;
2668 200 : }
2669 :
2670 : /*
2671 : * Convert the bounded heap to a properly-sorted array
2672 : */
2673 : static void
2674 200 : sort_bounded_heap(Tuplesortstate *state)
2675 : {
5624 bruce 2676 200 : int tupcount = state->memtupcount;
2677 :
5819 tgl 2678 200 : Assert(state->status == TSS_BOUNDED);
2679 200 : Assert(state->bounded);
2680 200 : Assert(tupcount == state->bound);
1892 rhaas 2681 200 : Assert(SERIAL(state));
2682 :
2683 : /*
2684 : * We can unheapify in place because each delete-top call will remove the
2685 : * largest entry, which we can promptly store in the newly freed slot at
2686 : * the end. Once we're down to a single-entry heap, we're done.
2687 : */
5819 tgl 2688 9274 : while (state->memtupcount > 1)
2689 : {
5624 bruce 2690 9074 : SortTuple stup = state->memtuples[0];
2691 :
2692 : /* this sifts-up the next-largest entry and decreases memtupcount */
2018 rhaas 2693 9074 : tuplesort_heap_delete_top(state);
5819 tgl 2694 9074 : state->memtuples[state->memtupcount] = stup;
2695 : }
2696 200 : state->memtupcount = tupcount;
2697 :
2698 : /*
2699 : * Reverse sort direction back to the original state. This is not
2700 : * actually necessary but seems like a good idea for tidiness.
2701 : */
3075 rhaas 2702 200 : reversedirection(state);
2703 :
5819 tgl 2704 200 : state->status = TSS_SORTEDINMEM;
2705 200 : state->boundUsed = true;
2706 200 : }
2707 :
2708 : /*
2709 : * Sort all memtuples using specialized qsort() routines.
2710 : *
2711 : * Quicksort is used for small in-memory sorts, and external sort runs.
2712 : */
2713 : static void
2557 rhaas 2714 135335 : tuplesort_sort_memtuples(Tuplesortstate *state)
2715 : {
1892 2716 135335 : Assert(!LEADER(state));
2717 :
2557 2718 135335 : if (state->memtupcount > 1)
2719 : {
2720 : /*
2721 : * Do we have the leading column's value or abbreviation in datum1,
2722 : * and is there a specialization for its comparator?
2723 : */
256 akorotkov 2724 GNC 41869 : if (state->base.haveDatum1 && state->base.sortKeys)
2725 : {
2726 41856 : if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2727 : {
370 tmunro 2728 GIC 2959 : qsort_tuple_unsigned(state->memtuples,
2729 2959 : state->memtupcount,
2730 : state);
2731 2947 : return;
2732 : }
2733 : #if SIZEOF_DATUM >= 8
256 akorotkov 2734 GNC 38897 : else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2735 : {
370 tmunro 2736 GIC 497 : qsort_tuple_signed(state->memtuples,
2737 497 : state->memtupcount,
2738 : state);
2739 497 : return;
2740 : }
2741 : #endif
256 akorotkov 2742 GNC 38400 : else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2743 : {
370 tmunro 2744 GIC 18636 : qsort_tuple_int32(state->memtuples,
2745 18636 : state->memtupcount,
2746 : state);
2747 18606 : return;
2748 : }
2749 : }
2750 :
2751 : /* Can we use the single-key sort function? */
256 akorotkov 2752 GNC 19777 : if (state->base.onlyKey != NULL)
2753 : {
2557 rhaas 2754 GIC 2972 : qsort_ssup(state->memtuples, state->memtupcount,
256 akorotkov 2755 GNC 2972 : state->base.onlyKey);
2756 : }
2757 : else
2758 : {
2557 rhaas 2759 GIC 16805 : qsort_tuple(state->memtuples,
2760 16805 : state->memtupcount,
2761 : state->base.comparetup,
2762 : state);
2763 : }
2764 : }
2765 : }
2766 :
2767 : /*
2768 : * Insert a new tuple into an empty or existing heap, maintaining the
2769 : * heap invariant. Caller is responsible for ensuring there's room.
2770 : *
2771 : * Note: For some callers, tuple points to a memtuples[] entry above the
2772 : * end of the heap. This is safe as long as it's not immediately adjacent
2773 : * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2774 : * is, it might get overwritten before being moved into the heap!
2775 : */
2776 : static void
2018 2777 10915 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
2778 : {
2779 : SortTuple *memtuples;
2780 : int j;
2781 :
8562 tgl 2782 10915 : memtuples = state->memtuples;
6251 2783 10915 : Assert(state->memtupcount < state->memtupsize);
2784 :
4037 rhaas 2785 10915 : CHECK_FOR_INTERRUPTS();
2786 :
2787 : /*
2788 : * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2789 : * using 1-based array indexes, not 0-based.
2790 : */
8562 tgl 2791 10915 : j = state->memtupcount++;
2792 30531 : while (j > 0)
2793 : {
8397 bruce 2794 26524 : int i = (j - 1) >> 1;
2795 :
2018 rhaas 2796 26524 : if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
8575 tgl 2797 6908 : break;
8562 2798 19616 : memtuples[j] = memtuples[i];
8575 2799 19616 : j = i;
2800 : }
6251 2801 10915 : memtuples[j] = *tuple;
8575 2802 10915 : }
2803 :
2804 : /*
2805 : * Remove the tuple at state->memtuples[0] from the heap. Decrement
2806 : * memtupcount, and sift up to maintain the heap invariant.
2807 : *
2808 : * The caller has already free'd the tuple the top node points to,
2809 : * if necessary.
2810 : */
2811 : static void
2018 rhaas 2812 10700 : tuplesort_heap_delete_top(Tuplesortstate *state)
2813 : {
6251 tgl 2814 10700 : SortTuple *memtuples = state->memtuples;
2815 : SortTuple *tuple;
2816 :
8562 2817 10700 : if (--state->memtupcount <= 0)
8575 2818 330 : return;
2819 :
2820 : /*
2821 : * Remove the last tuple in the heap, and re-insert it, by replacing the
2822 : * current top node with it.
2823 : */
2401 heikki.linnakangas 2824 10370 : tuple = &memtuples[state->memtupcount];
2018 rhaas 2825 10370 : tuplesort_heap_replace_top(state, tuple);
2826 : }
2827 :
2828 : /*
2829 : * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2830 : * maintain the heap invariant.
2831 : *
2832 : * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2833 : * Heapsort, steps H3-H8).
2834 : */
2835 : static void
2836 4124796 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
2837 : {
2401 heikki.linnakangas 2838 4124796 : SortTuple *memtuples = state->memtuples;
2839 : unsigned int i,
2840 : n;
2841 :
2842 4124796 : Assert(state->memtupcount >= 1);
2843 :
4037 rhaas 2844 4124796 : CHECK_FOR_INTERRUPTS();
2845 :
2846 : /*
2847 : * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2848 : * This prevents overflow in the "2 * i + 1" calculation, since at the top
2849 : * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2850 : */
8562 tgl 2851 4124796 : n = state->memtupcount;
8575 2852 4124796 : i = 0; /* i is where the "hole" is */
2853 : for (;;)
8562 2854 1109384 : {
2097 2855 5234180 : unsigned int j = 2 * i + 1;
2856 :
8575 2857 5234180 : if (j >= n)
2858 1010574 : break;
8397 bruce 2859 5638577 : if (j + 1 < n &&
2018 rhaas 2860 1414971 : COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
8575 tgl 2861 575668 : j++;
2018 rhaas 2862 4223606 : if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
8575 tgl 2863 3114222 : break;
8562 2864 1109384 : memtuples[i] = memtuples[j];
8575 2865 1109384 : i = j;
2866 : }
6251 2867 4124796 : memtuples[i] = *tuple;
8575 2868 4124796 : }
2869 :
2870 : /*
2871 : * Function to reverse the sort direction from its current state
2872 : *
2873 : * It is not safe to call this when performing hash tuplesorts
2874 : */
2875 : static void
3075 rhaas 2876 400 : reversedirection(Tuplesortstate *state)
2877 : {
256 akorotkov 2878 GNC 400 : SortSupport sortKey = state->base.sortKeys;
2879 : int nkey;
2880 :
2881 980 : for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2882 : {
3075 rhaas 2883 GIC 580 : sortKey->ssup_reverse = !sortKey->ssup_reverse;
2884 580 : sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2885 : }
2886 400 : }
2887 :
2888 :
2889 : /*
2890 : * Tape interface routines
2891 : */
2892 :
2893 : static unsigned int
538 heikki.linnakangas 2894 3998265 : getlen(LogicalTape *tape, bool eofOK)
2895 : {
2896 : unsigned int len;
2897 :
2898 3998265 : if (LogicalTapeRead(tape,
2899 : &len, sizeof(len)) != sizeof(len))
7198 tgl 2900 UIC 0 : elog(ERROR, "unexpected end of tape");
8575 tgl 2901 GIC 3998265 : if (len == 0 && !eofOK)
7198 tgl 2902 UIC 0 : elog(ERROR, "unexpected end of data");
8575 tgl 2903 GIC 3998265 : return len;
2904 : }
2905 :
2906 : static void
538 heikki.linnakangas 2907 1825 : markrunend(LogicalTape *tape)
2908 : {
8397 bruce 2909 1825 : unsigned int len = 0;
2910 :
100 peter 2911 GNC 1825 : LogicalTapeWrite(tape, &len, sizeof(len));
8575 tgl 2912 GIC 1825 : }
2913 :
2914 : /*
2915 : * Get memory for tuple from within READTUP() routine.
2916 : *
2917 : * We use next free slot from the slab allocator, or palloc() if the tuple
2918 : * is too large for that.
2919 : */
2920 : void *
256 akorotkov 2921 GNC 3846345 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
2922 : {
2923 : SlabSlot *buf;
2924 :
2925 : /*
2926 : * We pre-allocate enough slots in the slab arena that we should never run
2927 : * out.
2928 : */
2379 heikki.linnakangas 2929 GIC 3846345 : Assert(state->slabFreeHead);
2930 :
2931 3846345 : if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
256 akorotkov 2932 UNC 0 : return MemoryContextAlloc(state->base.sortcontext, tuplen);
2933 : else
2934 : {
2379 heikki.linnakangas 2935 GIC 3846345 : buf = state->slabFreeHead;
2936 : /* Reuse this slot */
2937 3846345 : state->slabFreeHead = buf->nextfree;
2938 :
2939 3846345 : return buf;
2940 : }
2941 : }
2942 :
2943 :
2944 : /*
2945 : * Parallel sort routines
2946 : */
2947 :
2948 : /*
2949 : * tuplesort_estimate_shared - estimate required shared memory allocation
2950 : *
2951 : * nWorkers is an estimate of the number of workers (it's the number that
2952 : * will be requested).
2953 : */
2954 : Size
1892 rhaas 2955 71 : tuplesort_estimate_shared(int nWorkers)
2956 : {
2957 : Size tapesSize;
2958 :
2959 71 : Assert(nWorkers > 0);
2960 :
2961 : /* Make sure that BufFile shared state is MAXALIGN'd */
2962 71 : tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2963 71 : tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2964 :
2965 71 : return tapesSize;
2966 : }
2967 :
2968 : /*
2969 : * tuplesort_initialize_shared - initialize shared tuplesort state
2970 : *
2971 : * Must be called from leader process before workers are launched, to
2972 : * establish state needed up-front for worker tuplesortstates. nWorkers
2973 : * should match the argument passed to tuplesort_estimate_shared().
2974 : */
2975 : void
2976 103 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
2977 : {
2978 : int i;
2979 :
2980 103 : Assert(nWorkers > 0);
2981 :
2982 103 : SpinLockInit(&shared->mutex);
2983 103 : shared->currentWorker = 0;
2984 103 : shared->workersFinished = 0;
2985 103 : SharedFileSetInit(&shared->fileset, seg);
2986 103 : shared->nTapes = nWorkers;
2987 309 : for (i = 0; i < nWorkers; i++)
2988 : {
2989 206 : shared->tapes[i].firstblocknumber = 0L;
2990 : }
2991 103 : }
2992 :
2993 : /*
2994 : * tuplesort_attach_shared - attach to shared tuplesort state
2995 : *
2996 : * Must be called by all worker processes.
2997 : */
2998 : void
2999 103 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
3000 : {
3001 : /* Attach to SharedFileSet */
3002 103 : SharedFileSetAttach(&shared->fileset, seg);
3003 103 : }
3004 :
3005 : /*
3006 : * worker_get_identifier - Assign and return ordinal identifier for worker
3007 : *
3008 : * The order in which these are assigned is not well defined, and should not
3009 : * matter; worker numbers across parallel sort participants need only be
3010 : * distinct and gapless. logtape.c requires this.
3011 : *
3012 : * Note that the identifiers assigned from here have no relation to
3013 : * ParallelWorkerNumber number, to avoid making any assumption about
3014 : * caller's requirements. However, we do follow the ParallelWorkerNumber
3015 : * convention of representing a non-worker with worker number -1. This
3016 : * includes the leader, as well as serial Tuplesort processes.
3017 : */
3018 : static int
3019 206 : worker_get_identifier(Tuplesortstate *state)
3020 : {
3021 206 : Sharedsort *shared = state->shared;
3022 : int worker;
3023 :
3024 206 : Assert(WORKER(state));
3025 :
3026 206 : SpinLockAcquire(&shared->mutex);
3027 206 : worker = shared->currentWorker++;
3028 206 : SpinLockRelease(&shared->mutex);
3029 :
3030 206 : return worker;
3031 : }
3032 :
3033 : /*
3034 : * worker_freeze_result_tape - freeze worker's result tape for leader
3035 : *
3036 : * This is called by workers just after the result tape has been determined,
3037 : * instead of calling LogicalTapeFreeze() directly. They do so because
3038 : * workers require a few additional steps over similar serial
3039 : * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3040 : * steps are around freeing now unneeded resources, and representing to
3041 : * leader that worker's input run is available for its merge.
3042 : *
3043 : * There should only be one final output run for each worker, which consists
3044 : * of all tuples that were originally input into worker.
3045 : */
3046 : static void
3047 206 : worker_freeze_result_tape(Tuplesortstate *state)
3048 : {
3049 206 : Sharedsort *shared = state->shared;
3050 : TapeShare output;
3051 :
3052 206 : Assert(WORKER(state));
538 heikki.linnakangas 3053 206 : Assert(state->result_tape != NULL);
1892 rhaas 3054 206 : Assert(state->memtupcount == 0);
3055 :
3056 : /*
3057 : * Free most remaining memory, in case caller is sensitive to our holding
3058 : * on to it. memtuples may not be a tiny merge heap at this point.
3059 : */
3060 206 : pfree(state->memtuples);
3061 : /* Be tidy */
3062 206 : state->memtuples = NULL;
3063 206 : state->memtupsize = 0;
3064 :
3065 : /*
3066 : * Parallel worker requires result tape metadata, which is to be stored in
3067 : * shared memory for leader
3068 : */
538 heikki.linnakangas 3069 206 : LogicalTapeFreeze(state->result_tape, &output);
3070 :
3071 : /* Store properties of output tape, and update finished worker count */
1892 rhaas 3072 206 : SpinLockAcquire(&shared->mutex);
3073 206 : shared->tapes[state->worker] = output;
3074 206 : shared->workersFinished++;
3075 206 : SpinLockRelease(&shared->mutex);
3076 206 : }
3077 :
3078 : /*
3079 : * worker_nomergeruns - dump memtuples in worker, without merging
3080 : *
3081 : * This called as an alternative to mergeruns() with a worker when no
3082 : * merging is required.
3083 : */
3084 : static void
3085 205 : worker_nomergeruns(Tuplesortstate *state)
3086 : {
3087 205 : Assert(WORKER(state));
538 heikki.linnakangas 3088 205 : Assert(state->result_tape == NULL);
3089 205 : Assert(state->nOutputRuns == 1);
3090 :
3091 205 : state->result_tape = state->destTape;
1892 rhaas 3092 205 : worker_freeze_result_tape(state);
3093 205 : }
3094 :
3095 : /*
3096 : * leader_takeover_tapes - create tapeset for leader from worker tapes
3097 : *
3098 : * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3099 : * sorting has occurred in workers, all of which must have already returned
3100 : * from tuplesort_performsort().
3101 : *
3102 : * When this returns, leader process is left in a state that is virtually
3103 : * indistinguishable from it having generated runs as a serial external sort
3104 : * might have.
3105 : */
3106 : static void
3107 71 : leader_takeover_tapes(Tuplesortstate *state)
3108 : {
3109 71 : Sharedsort *shared = state->shared;
3110 71 : int nParticipants = state->nParticipants;
3111 : int workersFinished;
3112 : int j;
3113 :
3114 71 : Assert(LEADER(state));
3115 71 : Assert(nParticipants >= 1);
3116 :
3117 71 : SpinLockAcquire(&shared->mutex);
3118 71 : workersFinished = shared->workersFinished;
3119 71 : SpinLockRelease(&shared->mutex);
3120 :
3121 71 : if (nParticipants != workersFinished)
1892 rhaas 3122 UIC 0 : elog(ERROR, "cannot take over tapes before all workers finish");
3123 :
3124 : /*
3125 : * Create the tapeset from worker tapes, including a leader-owned tape at
3126 : * the end. Parallel workers are far more expensive than logical tapes,
3127 : * so the number of tapes allocated here should never be excessive.
3128 : */
538 heikki.linnakangas 3129 GIC 71 : inittapestate(state, nParticipants);
3130 71 : state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3131 :
3132 : /*
3133 : * Set currentRun to reflect the number of runs we will merge (it's not
3134 : * used for anything, this is just pro forma)
3135 : */
1892 rhaas 3136 71 : state->currentRun = nParticipants;
3137 :
3138 : /*
3139 : * Initialize the state to look the same as after building the initial
3140 : * runs.
3141 : *
3142 : * There will always be exactly 1 run per worker, and exactly one input
3143 : * tape per run, because workers always output exactly 1 run, even when
3144 : * there were no input tuples for workers to sort.
3145 : */
538 heikki.linnakangas 3146 71 : state->inputTapes = NULL;
3147 71 : state->nInputTapes = 0;
3148 71 : state->nInputRuns = 0;
3149 :
3150 71 : state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3151 71 : state->nOutputTapes = nParticipants;
3152 71 : state->nOutputRuns = nParticipants;
3153 :
3154 213 : for (j = 0; j < nParticipants; j++)
3155 : {
3156 142 : state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3157 : }
3158 :
1892 rhaas 3159 71 : state->status = TSS_BUILDRUNS;
3160 71 : }
3161 :
3162 : /*
3163 : * Convenience routine to free a tuple previously loaded into sort memory
3164 : */
3165 : static void
5819 tgl 3166 1872895 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3167 : {
635 drowley 3168 1872895 : if (stup->tuple)
3169 : {
3170 1791797 : FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3171 1791797 : pfree(stup->tuple);
3172 1791797 : stup->tuple = NULL;
3173 : }
5819 tgl 3174 1872895 : }
3175 :
3176 : int
372 john.naylor 3177 2074284 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
3178 : {
3179 2074284 : if (x < y)
372 john.naylor 3180 UIC 0 : return -1;
372 john.naylor 3181 GIC 2074284 : else if (x > y)
372 john.naylor 3182 UIC 0 : return 1;
3183 : else
372 john.naylor 3184 GIC 2074284 : return 0;
3185 : }
3186 :
3187 : #if SIZEOF_DATUM >= 8
3188 : int
3189 562926 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3190 : {
333 drowley 3191 562926 : int64 xx = DatumGetInt64(x);
3192 562926 : int64 yy = DatumGetInt64(y);
3193 :
372 john.naylor 3194 562926 : if (xx < yy)
3195 208602 : return -1;
3196 354324 : else if (xx > yy)
3197 173964 : return 1;
3198 : else
3199 180360 : return 0;
3200 : }
3201 : #endif
3202 :
3203 : int
3204 118722719 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
3205 : {
333 drowley 3206 118722719 : int32 xx = DatumGetInt32(x);
3207 118722719 : int32 yy = DatumGetInt32(y);
3208 :
372 john.naylor 3209 118722719 : if (xx < yy)
3210 25155852 : return -1;
3211 93566867 : else if (xx > yy)
3212 23604938 : return 1;
3213 : else
3214 69961929 : return 0;
3215 : }
|