LCOV - differential code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 89.5 % 869 778 1 15 73 2 8 558 80 132 45 388 36 243
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 55 55 53 2 37 18
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * tuplesort.c
       4                 :  *    Generalized tuple sorting routines.
       5                 :  *
       6                 :  * This module provides a generalized facility for tuple sorting, which can be
       7                 :  * applied to different kinds of sortable objects.  Implementation of
       8                 :  * the particular sorting variants is given in tuplesortvariants.c.
       9                 :  * This module works efficiently for both small and large amounts
      10                 :  * of data.  Small amounts are sorted in-memory using qsort().  Large
      11                 :  * amounts are sorted using temporary files and a standard external sort
      12                 :  * algorithm.
      13                 :  *
      14                 :  * See Knuth, volume 3, for more than you want to know about external
      15                 :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16                 :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17                 :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18                 :  * merge is better.  Knuth is assuming that tape drives are expensive
      19                 :  * beasts, and in particular that there will always be many more runs than
      20                 :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21                 :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22                 :  * much more than a few Kb of memory buffers, so we can afford to have
      23                 :  * lots of them.  In particular, if we can have as many tape drives as
      24                 :  * sorted runs, we can eliminate any repeated I/O at all.
      25                 :  *
      26                 :  * Historically, we divided the input into sorted runs using replacement
      27                 :  * selection, in the form of a priority tree implemented as a heap
      28                 :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29                 :  * for run generation.
      30                 :  *
      31                 :  * The approximate amount of memory allowed for any one sort operation
      32                 :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33                 :  * we absorb tuples and simply store them in an unsorted array as long as
      34                 :  * we haven't exceeded workMem.  If we reach the end of the input without
      35                 :  * exceeding workMem, we sort the array using qsort() and subsequently return
      36                 :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37                 :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38                 :  * When tuples are dumped in batch after quicksorting, we begin a new run
      39                 :  * with a new output tape.  If we reach the max number of tapes, we write
      40                 :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41                 :  * need multiple merge passes to finish the merge in that case.  After the
      42                 :  * end of the input is reached, we dump out remaining tuples in memory into
      43                 :  * a final run, then merge the runs.
      44                 :  *
      45                 :  * When merging runs, we use a heap containing just the frontmost tuple from
      46                 :  * each source run; we repeatedly output the smallest tuple and replace it
      47                 :  * with the next tuple from its source tape (if any).  When the heap empties,
      48                 :  * the merge is complete.  The basic merge algorithm thus needs very little
      49                 :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50                 :  * small number.  However, we can still make good use of our full workMem
      51                 :  * allocation by pre-reading additional blocks from each source tape.  Without
      52                 :  * prereading, our access pattern to the temporary file would be very erratic;
      53                 :  * on average we'd read one block from each of M source tapes during the same
      54                 :  * time that we're writing M blocks to the output tape, so there is no
      55                 :  * sequentiality of access at all, defeating the read-ahead methods used by
      56                 :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57                 :  * sequence of blocks of the temp file, ensuring that things will be even
      58                 :  * worse when it comes time to read that tape.  A straightforward merge pass
      59                 :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60                 :  * by prereading from each source tape sequentially, loading about workMem/M
      61                 :  * bytes from each tape in turn, and making the sequential blocks immediately
      62                 :  * available for reuse.  This approach helps to localize both read and write
      63                 :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64                 :  * much memory to use for the buffers.
      65                 :  *
      66                 :  * In the current code we determine the number of input tapes M on the basis
      67                 :  * of workMem: we want workMem/M to be large enough that we read a fair
      68                 :  * amount of data each time we read from a tape, so as to maintain the
      69                 :  * locality of access described above.  Nonetheless, with large workMem we
      70                 :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71                 :  * which avoids space wastage by recycling disk space as soon as each block
      72                 :  * is read from its "tape".
      73                 :  *
      74                 :  * When the caller requests random access to the sort result, we form
      75                 :  * the final sorted run on a logical tape which is then "frozen", so
      76                 :  * that we can access it randomly.  When the caller does not need random
      77                 :  * access, we return from tuplesort_performsort() as soon as we are down
      78                 :  * to one run per logical tape.  The final merge is then performed
      79                 :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80                 :  * saves one cycle of writing all the data out to disk and reading it in.
      81                 :  *
      82                 :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83                 :  * among one or more worker processes, and a leader process, each with its own
      84                 :  * tuplesort state.  The leader process (or, more accurately, the
      85                 :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86                 :  * consisting of worker tapes with one run to merge; a run for every
      87                 :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88                 :  * produce exactly one output run from their partial input.
      89                 :  *
      90                 :  *
      91                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      92                 :  * Portions Copyright (c) 1994, Regents of the University of California
      93                 :  *
      94                 :  * IDENTIFICATION
      95                 :  *    src/backend/utils/sort/tuplesort.c
      96                 :  *
      97                 :  *-------------------------------------------------------------------------
      98                 :  */
      99                 : 
     100                 : #include "postgres.h"
     101                 : 
     102                 : #include <limits.h>
     103                 : 
     104                 : #include "catalog/pg_am.h"
     105                 : #include "commands/tablespace.h"
     106                 : #include "executor/executor.h"
     107                 : #include "miscadmin.h"
     108                 : #include "pg_trace.h"
     109                 : #include "storage/shmem.h"
     110                 : #include "utils/memutils.h"
     111                 : #include "utils/pg_rusage.h"
     112                 : #include "utils/rel.h"
     113                 : #include "utils/tuplesort.h"
     114                 : 
     115                 : /*
     116                 :  * Initial size of memtuples array.  We're trying to select this size so that
     117                 :  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
     118                 :  * allocation might possibly be lowered.  However, we don't consider array sizes
     119                 :  * less than 1024.
     120                 :  *
     121                 :  */
     122                 : #define INITIAL_MEMTUPSIZE Max(1024, \
     123                 :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     124                 : 
     125                 : /* GUC variables */
     126                 : #ifdef TRACE_SORT
     127                 : bool        trace_sort = false;
     128                 : #endif
     129                 : 
     130                 : #ifdef DEBUG_BOUNDED_SORT
     131                 : bool        optimize_bounded_sort = true;
     132                 : #endif
     133                 : 
     134                 : 
     135                 : /*
     136                 :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     137                 :  * tuples.  To avoid palloc/pfree overhead.
     138                 :  *
     139                 :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     140                 :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     141                 :  * palloc() overhead is not significant anymore.
     142                 :  *
     143                 :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     144                 :  * slot holds a tuple.
     145                 :  */
     146                 : #define SLAB_SLOT_SIZE 1024
     147                 : 
     148                 : typedef union SlabSlot
     149                 : {
     150                 :     union SlabSlot *nextfree;
     151                 :     char        buffer[SLAB_SLOT_SIZE];
     152                 : } SlabSlot;
     153                 : 
     154                 : /*
     155                 :  * Possible states of a Tuplesort object.  These denote the states that
     156                 :  * persist between calls of Tuplesort routines.
     157                 :  */
     158                 : typedef enum
     159                 : {
     160                 :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     161                 :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     162                 :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     163                 :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     164                 :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     165                 :     TSS_FINALMERGE              /* Performing final merge on-the-fly */
     166                 : } TupSortStatus;
     167                 : 
     168                 : /*
     169                 :  * Parameters for calculation of number of tapes to use --- see inittapes()
     170                 :  * and tuplesort_merge_order().
     171                 :  *
     172                 :  * In this calculation we assume that each tape will cost us about 1 blocks
     173                 :  * worth of buffer space.  This ignores the overhead of all the other data
     174                 :  * structures needed for each tape, but it's probably close enough.
     175                 :  *
     176                 :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     177                 :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     178                 :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     179                 :  */
     180                 : #define MINORDER        6       /* minimum merge order */
     181                 : #define MAXORDER        500     /* maximum merge order */
     182                 : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     183                 : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     184                 : 
     185                 : 
     186                 : /*
     187                 :  * Private state of a Tuplesort operation.
     188                 :  */
     189                 : struct Tuplesortstate
     190                 : {
     191                 :     TuplesortPublic base;
     192                 :     TupSortStatus status;       /* enumerated value as shown above */
     193                 :     bool        bounded;        /* did caller specify a maximum number of
     194                 :                                  * tuples to return? */
     195                 :     bool        boundUsed;      /* true if we made use of a bounded heap */
     196                 :     int         bound;          /* if bounded, the maximum number of tuples */
     197                 :     int64       availMem;       /* remaining memory available, in bytes */
     198                 :     int64       allowedMem;     /* total memory allowed, in bytes */
     199                 :     int         maxTapes;       /* max number of input tapes to merge in each
     200                 :                                  * pass */
     201                 :     int64       maxSpace;       /* maximum amount of space occupied among sort
     202                 :                                  * of groups, either in-memory or on-disk */
     203                 :     bool        isMaxSpaceDisk; /* true when maxSpace is value for on-disk
     204                 :                                  * space, false when it's value for in-memory
     205                 :                                  * space */
     206                 :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     207                 :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     208                 : 
     209                 :     /*
     210                 :      * This array holds the tuples now in sort memory.  If we are in state
     211                 :      * INITIAL, the tuples are in no particular order; if we are in state
     212                 :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     213                 :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     214                 :      * H.  In state SORTEDONTAPE, the array is not used.
     215                 :      */
     216                 :     SortTuple  *memtuples;      /* array of SortTuple structs */
     217                 :     int         memtupcount;    /* number of tuples currently present */
     218                 :     int         memtupsize;     /* allocated length of memtuples array */
     219                 :     bool        growmemtuples;  /* memtuples' growth still underway? */
     220                 : 
     221                 :     /*
     222                 :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     223                 :      * rather than with palloc().  Currently, we switch to slab allocation
     224                 :      * when we start merging.  Merging only needs to keep a small, fixed
     225                 :      * number of tuples in memory at any time, so we can avoid the
     226                 :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     227                 :      * to hold the tuples.
     228                 :      *
     229                 :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     230                 :      * slots.  The allocation is sized to have one slot per tape, plus one
     231                 :      * additional slot.  We need that many slots to hold all the tuples kept
     232                 :      * in the heap during merge, plus the one we have last returned from the
     233                 :      * sort, with tuplesort_gettuple.
     234                 :      *
     235                 :      * Initially, all the slots are kept in a linked list of free slots.  When
     236                 :      * a tuple is read from a tape, it is put to the next available slot, if
     237                 :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     238                 :      * instead.
     239                 :      *
     240                 :      * When we're done processing a tuple, we return the slot back to the free
     241                 :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     242                 :      * allocated from the slab, if its pointer value is between
     243                 :      * slabMemoryBegin and -End.
     244                 :      *
     245                 :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     246                 :      * tracking memory usage is not used.
     247                 :      */
     248                 :     bool        slabAllocatorUsed;
     249                 : 
     250                 :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     251                 :     char       *slabMemoryEnd;  /* end of slab memory arena */
     252                 :     SlabSlot   *slabFreeHead;   /* head of free list */
     253                 : 
     254                 :     /* Memory used for input and output tape buffers. */
     255                 :     size_t      tape_buffer_mem;
     256                 : 
     257                 :     /*
     258                 :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     259                 :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     260                 :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     261                 :      * recycle the memory on next gettuple call.
     262                 :      */
     263                 :     void       *lastReturnedTuple;
     264                 : 
     265                 :     /*
     266                 :      * While building initial runs, this is the current output run number.
     267                 :      * Afterwards, it is the number of initial runs we made.
     268                 :      */
     269                 :     int         currentRun;
     270                 : 
     271                 :     /*
     272                 :      * Logical tapes, for merging.
     273                 :      *
     274                 :      * The initial runs are written in the output tapes.  In each merge pass,
     275                 :      * the output tapes of the previous pass become the input tapes, and new
     276                 :      * output tapes are created as needed.  When nInputTapes equals
     277                 :      * nInputRuns, there is only one merge pass left.
     278                 :      */
     279                 :     LogicalTape **inputTapes;
     280                 :     int         nInputTapes;
     281                 :     int         nInputRuns;
     282                 : 
     283                 :     LogicalTape **outputTapes;
     284                 :     int         nOutputTapes;
     285                 :     int         nOutputRuns;
     286                 : 
     287                 :     LogicalTape *destTape;      /* current output tape */
     288                 : 
     289                 :     /*
     290                 :      * These variables are used after completion of sorting to keep track of
     291                 :      * the next tuple to return.  (In the tape case, the tape's current read
     292                 :      * position is also critical state.)
     293                 :      */
     294                 :     LogicalTape *result_tape;   /* actual tape of finished output */
     295                 :     int         current;        /* array index (only used if SORTEDINMEM) */
     296                 :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     297                 : 
     298                 :     /* markpos_xxx holds marked position for mark and restore */
     299                 :     long        markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     300                 :     int         markpos_offset; /* saved "current", or offset in tape block */
     301                 :     bool        markpos_eof;    /* saved "eof_reached" */
     302                 : 
     303                 :     /*
     304                 :      * These variables are used during parallel sorting.
     305                 :      *
     306                 :      * worker is our worker identifier.  Follows the general convention that
     307                 :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     308                 :      * tuplesorts. (-1 can also be a serial tuplesort.)
     309                 :      *
     310                 :      * shared is mutable shared memory state, which is used to coordinate
     311                 :      * parallel sorts.
     312                 :      *
     313                 :      * nParticipants is the number of worker Tuplesortstates known by the
     314                 :      * leader to have actually been launched, which implies that they must
     315                 :      * finish a run that the leader needs to merge.  Typically includes a
     316                 :      * worker state held by the leader process itself.  Set in the leader
     317                 :      * Tuplesortstate only.
     318                 :      */
     319                 :     int         worker;
     320                 :     Sharedsort *shared;
     321                 :     int         nParticipants;
     322                 : 
     323                 :     /*
     324                 :      * Additional state for managing "abbreviated key" sortsupport routines
     325                 :      * (which currently may be used by all cases except the hash index case).
     326                 :      * Tracks the intervals at which the optimization's effectiveness is
     327                 :      * tested.
     328                 :      */
     329                 :     int64       abbrevNext;     /* Tuple # at which to next check
     330                 :                                  * applicability */
     331                 : 
     332                 :     /*
     333                 :      * Resource snapshot for time of sort start.
     334                 :      */
     335                 : #ifdef TRACE_SORT
     336                 :     PGRUsage    ru_start;
     337                 : #endif
     338                 : };
     339                 : 
     340                 : /*
     341                 :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     342                 :  * in shared memory.
     343                 :  */
     344 ECB             : struct Sharedsort
     345                 : {
     346                 :     /* mutex protects all fields prior to tapes */
     347                 :     slock_t     mutex;
     348                 : 
     349                 :     /*
     350                 :      * currentWorker generates ordinal identifier numbers for parallel sort
     351                 :      * workers.  These start from 0, and are always gapless.
     352                 :      *
     353                 :      * Workers increment workersFinished to indicate having finished.  If this
     354                 :      * is equal to state.nParticipants within the leader, leader is ready to
     355                 :      * merge worker runs.
     356                 :      */
     357                 :     int         currentWorker;
     358                 :     int         workersFinished;
     359 EUB             : 
     360                 :     /* Temporary file space */
     361 ECB             :     SharedFileSet fileset;
     362                 : 
     363                 :     /* Size of tapes flexible array */
     364                 :     int         nTapes;
     365                 : 
     366                 :     /*
     367                 :      * Tapes array used by workers to report back information needed by the
     368                 :      * leader to concatenate all worker tapes into one for merging
     369                 :      */
     370                 :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     371                 : };
     372                 : 
     373                 : /*
     374                 :  * Is the given tuple allocated from the slab memory arena?
     375                 :  */
     376                 : #define IS_SLAB_SLOT(state, tuple) \
     377                 :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     378                 :      (char *) (tuple) < (state)->slabMemoryEnd)
     379                 : 
     380                 : /*
     381                 :  * Return the given tuple to the slab memory free list, or free it
     382                 :  * if it was palloc'd.
     383                 :  */
     384                 : #define RELEASE_SLAB_SLOT(state, tuple) \
     385                 :     do { \
     386                 :         SlabSlot *buf = (SlabSlot *) tuple; \
     387                 :         \
     388                 :         if (IS_SLAB_SLOT((state), buf)) \
     389                 :         { \
     390                 :             buf->nextfree = (state)->slabFreeHead; \
     391                 :             (state)->slabFreeHead = buf; \
     392                 :         } else \
     393                 :             pfree(buf); \
     394                 :     } while(0)
     395                 : 
     396                 : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     397                 : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     398                 : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     399                 : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     400                 : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     401                 : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     402                 : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     403                 : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     404                 : #define SERIAL(state)       ((state)->shared == NULL)
     405                 : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     406                 : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     407                 : 
     408                 : /*
     409                 :  * NOTES about on-tape representation of tuples:
     410                 :  *
     411                 :  * We require the first "unsigned int" of a stored tuple to be the total size
     412                 :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     413                 :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     414                 :  * may or may not match the in-memory representation of the tuple ---
     415                 :  * any conversion needed is the job of the writetup and readtup routines.
     416                 :  *
     417                 :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     418                 :  * representation of the tuple must be followed by another "unsigned int" that
     419                 :  * is a copy of the length --- so the total tape space used is actually
     420                 :  * sizeof(unsigned int) more than the stored length value.  This allows
     421                 :  * read-backwards.  When the random access flag was not specified, the
     422                 :  * write/read routines may omit the extra length word.
     423                 :  *
     424                 :  * writetup is expected to write both length words as well as the tuple
     425                 :  * data.  When readtup is called, the tape is positioned just after the
     426                 :  * front length word; readtup must read the tuple data and advance past
     427                 :  * the back length word (if present).
     428                 :  *
     429                 :  * The write/read routines can make use of the tuple description data
     430                 :  * stored in the Tuplesortstate record, if needed.  They are also expected
     431                 :  * to adjust state->availMem by the amount of memory space (not tape space!)
     432                 :  * released or consumed.  There is no error return from either writetup
     433                 :  * or readtup; they should ereport() on failure.
     434                 :  *
     435                 :  *
     436                 :  * NOTES about memory consumption calculations:
     437                 :  *
     438                 :  * We count space allocated for tuples against the workMem limit, plus
     439                 :  * the space used by the variable-size memtuples array.  Fixed-size space
     440                 :  * is not counted; it's small enough to not be interesting.
     441                 :  *
     442                 :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     443                 :  * rather than the originally-requested size.  This is important since
     444                 :  * palloc can add substantial overhead.  It's not a complete answer since
     445                 :  * we won't count any wasted space in palloc allocation blocks, but it's
     446                 :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     447                 :  * separate memory context is used for caller passed tuples.  Resetting
     448                 :  * it at certain key increments significantly ameliorates fragmentation.
     449                 :  * readtup routines use the slab allocator (they cannot use
     450                 :  * the reset context because it gets deleted at the point that merging
     451                 :  * begins).
     452                 :  */
     453                 : 
     454                 : 
     455                 : static void tuplesort_begin_batch(Tuplesortstate *state);
     456                 : static bool consider_abort_common(Tuplesortstate *state);
     457                 : static void inittapes(Tuplesortstate *state, bool mergeruns);
     458                 : static void inittapestate(Tuplesortstate *state, int maxTapes);
     459                 : static void selectnewtape(Tuplesortstate *state);
     460                 : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     461                 : static void mergeruns(Tuplesortstate *state);
     462                 : static void mergeonerun(Tuplesortstate *state);
     463                 : static void beginmerge(Tuplesortstate *state);
     464                 : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     465                 : static void dumptuples(Tuplesortstate *state, bool alltuples);
     466                 : static void make_bounded_heap(Tuplesortstate *state);
     467                 : static void sort_bounded_heap(Tuplesortstate *state);
     468                 : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     469                 : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     470                 : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     471                 : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     472                 : static void reversedirection(Tuplesortstate *state);
     473                 : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     474                 : static void markrunend(LogicalTape *tape);
     475                 : static int  worker_get_identifier(Tuplesortstate *state);
     476                 : static void worker_freeze_result_tape(Tuplesortstate *state);
     477                 : static void worker_nomergeruns(Tuplesortstate *state);
     478                 : static void leader_takeover_tapes(Tuplesortstate *state);
     479                 : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     480                 : static void tuplesort_free(Tuplesortstate *state);
     481                 : static void tuplesort_updatemax(Tuplesortstate *state);
     482                 : 
     483                 : /*
     484                 :  * Specialized comparators that we can inline into specialized sorts.  The goal
     485                 :  * is to try to sort two tuples without having to follow the pointers to the
     486                 :  * comparator or the tuple.
     487                 :  *
     488                 :  * XXX: For now, these fall back to comparator functions that will compare the
     489 EUB             :  * leading datum a second time.
     490                 :  *
     491                 :  * XXX: For now, there is no specialization for cases where datum1 is
     492 ECB             :  * authoritative and we don't even need to fall back to a callback at all (that
     493                 :  * would be true for types like int4/int8/timestamp/date, but not true for
     494                 :  * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
     495                 :  */
     496                 : 
     497                 : /* Used if first key's comparator is ssup_datum_unsigned_compare */
     498                 : static pg_attribute_always_inline int
     499 GIC    22722154 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     500                 : {
     501                 :     int         compare;
     502 ECB             : 
     503 CBC    22722154 :     compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
     504        22722154 :                                           b->datum1, b->isnull1,
     505                 :                                           &state->base.sortKeys[0]);
     506 GIC    22722154 :     if (compare != 0)
     507        20647602 :         return compare;
     508                 : 
     509                 :     /*
     510 ECB             :      * No need to waste effort calling the tiebreak function when there are no
     511                 :      * other keys to sort on.
     512                 :      */
     513 GNC     2074552 :     if (state->base.onlyKey != NULL)
     514 UIC           0 :         return 0;
     515                 : 
     516 GNC     2074552 :     return state->base.comparetup(a, b, state);
     517 ECB             : }
     518                 : 
     519                 : #if SIZEOF_DATUM >= 8
     520                 : /* Used if first key's comparator is ssup_datum_signed_compare */
     521                 : static pg_attribute_always_inline int
     522 GIC     2811465 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     523 ECB             : {
     524                 :     int         compare;
     525                 : 
     526 CBC     2811465 :     compare = ApplySignedSortComparator(a->datum1, a->isnull1,
     527         2811465 :                                         b->datum1, b->isnull1,
     528                 :                                         &state->base.sortKeys[0]);
     529                 : 
     530         2811465 :     if (compare != 0)
     531 GIC     2806238 :         return compare;
     532                 : 
     533 ECB             :     /*
     534                 :      * No need to waste effort calling the tiebreak function when there are no
     535                 :      * other keys to sort on.
     536                 :      */
     537 GNC        5227 :     if (state->base.onlyKey != NULL)
     538 GIC         521 :         return 0;
     539                 : 
     540 GNC        4706 :     return state->base.comparetup(a, b, state);
     541 ECB             : }
     542                 : #endif
     543                 : 
     544                 : /* Used if first key's comparator is ssup_datum_int32_compare */
     545                 : static pg_attribute_always_inline int
     546 CBC    26236863 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     547                 : {
     548 ECB             :     int         compare;
     549                 : 
     550 GIC    26236863 :     compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
     551        26236863 :                                        b->datum1, b->isnull1,
     552                 :                                        &state->base.sortKeys[0]);
     553                 : 
     554        26236863 :     if (compare != 0)
     555        18709011 :         return compare;
     556                 : 
     557                 :     /*
     558                 :      * No need to waste effort calling the tiebreak function when there are no
     559 ECB             :      * other keys to sort on.
     560                 :      */
     561 GNC     7527852 :     if (state->base.onlyKey != NULL)
     562 GIC      839021 :         return 0;
     563 ECB             : 
     564 GNC     6688831 :     return state->base.comparetup(a, b, state);
     565                 : }
     566                 : 
     567                 : /*
     568                 :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     569                 :  * any variant of SortTuples, using the appropriate comparetup function.
     570                 :  * qsort_ssup() is specialized for the case where the comparetup function
     571                 :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     572                 :  * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
     573                 :  * common comparison functions on pass-by-value leading datums.
     574                 :  */
     575                 : 
     576                 : #define ST_SORT qsort_tuple_unsigned
     577                 : #define ST_ELEMENT_TYPE SortTuple
     578 ECB             : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
     579                 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     580                 : #define ST_CHECK_FOR_INTERRUPTS
     581                 : #define ST_SCOPE static
     582                 : #define ST_DEFINE
     583                 : #include "lib/sort_template.h"
     584                 : 
     585                 : #if SIZEOF_DATUM >= 8
     586                 : #define ST_SORT qsort_tuple_signed
     587                 : #define ST_ELEMENT_TYPE SortTuple
     588                 : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
     589                 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     590                 : #define ST_CHECK_FOR_INTERRUPTS
     591                 : #define ST_SCOPE static
     592                 : #define ST_DEFINE
     593                 : #include "lib/sort_template.h"
     594                 : #endif
     595                 : 
     596                 : #define ST_SORT qsort_tuple_int32
     597                 : #define ST_ELEMENT_TYPE SortTuple
     598                 : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
     599                 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     600                 : #define ST_CHECK_FOR_INTERRUPTS
     601                 : #define ST_SCOPE static
     602                 : #define ST_DEFINE
     603                 : #include "lib/sort_template.h"
     604                 : 
     605                 : #define ST_SORT qsort_tuple
     606 EUB             : #define ST_ELEMENT_TYPE SortTuple
     607                 : #define ST_COMPARE_RUNTIME_POINTER
     608                 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     609                 : #define ST_CHECK_FOR_INTERRUPTS
     610 ECB             : #define ST_SCOPE static
     611                 : #define ST_DECLARE
     612                 : #define ST_DEFINE
     613                 : #include "lib/sort_template.h"
     614                 : 
     615                 : #define ST_SORT qsort_ssup
     616                 : #define ST_ELEMENT_TYPE SortTuple
     617                 : #define ST_COMPARE(a, b, ssup) \
     618 EUB             :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     619                 :                         (b)->datum1, (b)->isnull1, (ssup))
     620 ECB             : #define ST_COMPARE_ARG_TYPE SortSupportData
     621                 : #define ST_CHECK_FOR_INTERRUPTS
     622                 : #define ST_SCOPE static
     623                 : #define ST_DEFINE
     624                 : #include "lib/sort_template.h"
     625                 : 
     626                 : /*
     627                 :  *      tuplesort_begin_xxx
     628                 :  *
     629                 :  * Initialize for a tuple sort operation.
     630                 :  *
     631                 :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     632                 :  * zero or more times, then call tuplesort_performsort when all the tuples
     633                 :  * have been supplied.  After performsort, retrieve the tuples in sorted
     634                 :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     635                 :  * access was requested, rescan, markpos, and restorepos can also be called.)
     636                 :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     637                 :  *
     638                 :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     639                 :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     640                 :  * (The normal value of this parameter is work_mem, but some callers use
     641                 :  * other values.)  Each variant also has a sortopt which is a bitmask of
     642                 :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     643                 :  */
     644                 : 
     645                 : Tuplesortstate *
     646 GIC      189815 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     647                 : {
     648 ECB             :     Tuplesortstate *state;
     649                 :     MemoryContext maincontext;
     650                 :     MemoryContext sortcontext;
     651                 :     MemoryContext oldcontext;
     652                 : 
     653                 :     /* See leader_takeover_tapes() remarks on random access support */
     654 CBC      189815 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     655 UIC           0 :         elog(ERROR, "random access disallowed under parallel sort");
     656                 : 
     657 ECB             :     /*
     658 EUB             :      * Memory context surviving tuplesort_reset.  This memory context holds
     659                 :      * data which is useful to keep while sorting multiple similar batches.
     660                 :      */
     661 GIC      189815 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     662                 :                                         "TupleSort main",
     663                 :                                         ALLOCSET_DEFAULT_SIZES);
     664                 : 
     665                 :     /*
     666                 :      * Create a working memory context for one sort operation.  The content of
     667 ECB             :      * this context is deleted by tuplesort_reset.
     668 EUB             :      */
     669 GIC      189815 :     sortcontext = AllocSetContextCreate(maincontext,
     670 ECB             :                                         "TupleSort sort",
     671                 :                                         ALLOCSET_DEFAULT_SIZES);
     672                 : 
     673                 :     /*
     674                 :      * Additionally a working memory context for tuples is setup in
     675                 :      * tuplesort_begin_batch.
     676                 :      */
     677                 : 
     678                 :     /*
     679                 :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     680                 :      * don't need a separate pfree() operation for it at shutdown.
     681                 :      */
     682 GIC      189815 :     oldcontext = MemoryContextSwitchTo(maincontext);
     683 ECB             : 
     684 CBC      189815 :     state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
     685                 : 
     686                 : #ifdef TRACE_SORT
     687 GIC      189815 :     if (trace_sort)
     688 UIC           0 :         pg_rusage_init(&state->ru_start);
     689                 : #endif
     690                 : 
     691 GNC      189815 :     state->base.sortopt = sortopt;
     692          189815 :     state->base.tuples = true;
     693          189815 :     state->abbrevNext = 10;
     694 ECB             : 
     695                 :     /*
     696                 :      * workMem is forced to be at least 64KB, the current minimum valid value
     697                 :      * for the work_mem GUC.  This is a defense against parallel sort callers
     698                 :      * that divide out memory among many workers in a way that leaves each
     699                 :      * with very little memory.
     700                 :      */
     701 GIC      189815 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     702 GNC      189815 :     state->base.sortcontext = sortcontext;
     703          189815 :     state->base.maincontext = maincontext;
     704                 : 
     705 ECB             :     /*
     706                 :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     707                 :      * see comments in grow_memtuples().
     708                 :      */
     709 GIC      189815 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     710          189815 :     state->memtuples = NULL;
     711                 : 
     712                 :     /*
     713 ECB             :      * After all of the other non-parallel-related state, we setup all of the
     714                 :      * state needed for each batch.
     715                 :      */
     716 CBC      189815 :     tuplesort_begin_batch(state);
     717                 : 
     718                 :     /*
     719                 :      * Initialize parallel-related state based on coordination information
     720                 :      * from caller
     721                 :      */
     722 GIC      189815 :     if (!coordinate)
     723                 :     {
     724                 :         /* Serial sort */
     725          189506 :         state->shared = NULL;
     726          189506 :         state->worker = -1;
     727          189506 :         state->nParticipants = -1;
     728                 :     }
     729 CBC         309 :     else if (coordinate->isWorker)
     730 ECB             :     {
     731                 :         /* Parallel worker produces exactly one final run from all input */
     732 GIC         206 :         state->shared = coordinate->sharedsort;
     733 CBC         206 :         state->worker = worker_get_identifier(state);
     734 GIC         206 :         state->nParticipants = -1;
     735 EUB             :     }
     736                 :     else
     737                 :     {
     738                 :         /* Parallel leader state only used for final merge */
     739 GIC         103 :         state->shared = coordinate->sharedsort;
     740 GBC         103 :         state->worker = -1;
     741 GIC         103 :         state->nParticipants = coordinate->nParticipants;
     742             103 :         Assert(state->nParticipants >= 1);
     743                 :     }
     744                 : 
     745          189815 :     MemoryContextSwitchTo(oldcontext);
     746                 : 
     747          189815 :     return state;
     748                 : }
     749                 : 
     750                 : /*
     751                 :  *      tuplesort_begin_batch
     752                 :  *
     753                 :  * Setup, or reset, all state need for processing a new set of tuples with this
     754                 :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     755 ECB             :  * with this sort state) and tuplesort_reset (for subsequent usages).
     756                 :  */
     757                 : static void
     758 GIC      191035 : tuplesort_begin_batch(Tuplesortstate *state)
     759                 : {
     760                 :     MemoryContext oldcontext;
     761 ECB             : 
     762 GNC      191035 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     763                 : 
     764                 :     /*
     765                 :      * Caller tuple (e.g. IndexTuple) memory context.
     766                 :      *
     767                 :      * A dedicated child context used exclusively for caller passed tuples
     768                 :      * eases memory management.  Resetting at key points reduces
     769                 :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     770                 :      * in the parent context, not this context, because there is no need to
     771                 :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     772                 :      * order, so we use a regular aset.c context so that it can make use of
     773                 :      * free'd memory.  When the sort is not bounded, we make use of a
     774 ECB             :      * generation.c context as this keeps allocations more compact with less
     775                 :      * wastage.  Allocations are also slightly more CPU efficient.
     776                 :      */
     777 GNC      191035 :     if (state->base.sortopt & TUPLESORT_ALLOWBOUNDED)
     778             924 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     779                 :                                                          "Caller tuples",
     780                 :                                                          ALLOCSET_DEFAULT_SIZES);
     781                 :     else
     782          190111 :         state->base.tuplecontext = GenerationContextCreate(state->base.sortcontext,
     783                 :                                                            "Caller tuples",
     784                 :                                                            ALLOCSET_DEFAULT_SIZES);
     785                 : 
     786                 : 
     787 GIC      191035 :     state->status = TSS_INITIAL;
     788          191035 :     state->bounded = false;
     789          191035 :     state->boundUsed = false;
     790                 : 
     791 CBC      191035 :     state->availMem = state->allowedMem;
     792                 : 
     793 GIC      191035 :     state->tapeset = NULL;
     794                 : 
     795          191035 :     state->memtupcount = 0;
     796                 : 
     797                 :     /*
     798                 :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     799                 :      * see comments in grow_memtuples().
     800                 :      */
     801          191035 :     state->growmemtuples = true;
     802          191035 :     state->slabAllocatorUsed = false;
     803          191035 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     804                 :     {
     805 LBC           0 :         pfree(state->memtuples);
     806 UIC           0 :         state->memtuples = NULL;
     807 UBC           0 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     808 EUB             :     }
     809 GIC      191035 :     if (state->memtuples == NULL)
     810                 :     {
     811          189815 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     812 CBC      189815 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     813 ECB             :     }
     814                 : 
     815                 :     /* workMem must be large enough for the minimal memtuples array */
     816 GIC      191035 :     if (LACKMEM(state))
     817 UIC           0 :         elog(ERROR, "insufficient memory allowed for sort");
     818                 : 
     819 GIC      191035 :     state->currentRun = 0;
     820                 : 
     821                 :     /*
     822                 :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     823                 :      * inittapes(), if needed.
     824 ECB             :      */
     825                 : 
     826 GIC      191035 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     827 ECB             : 
     828 CBC      191035 :     MemoryContextSwitchTo(oldcontext);
     829          191035 : }
     830                 : 
     831                 : /*
     832                 :  * tuplesort_set_bound
     833 ECB             :  *
     834                 :  *  Advise tuplesort that at most the first N result tuples are required.
     835 EUB             :  *
     836                 :  * Must be called before inserting any tuples.  (Actually, we could allow it
     837                 :  * as long as the sort hasn't spilled to disk, but there seems no need for
     838                 :  * delayed calls at the moment.)
     839                 :  *
     840                 :  * This is a hint only. The tuplesort may still return more tuples than
     841                 :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     842                 :  */
     843                 : void
     844 GIC         857 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     845 ECB             : {
     846                 :     /* Assert we're called before loading any tuples */
     847 GIC         857 :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     848                 :     /* Assert we allow bounded sorts */
     849 GNC         857 :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     850                 :     /* Can't set the bound twice, either */
     851 GIC         857 :     Assert(!state->bounded);
     852                 :     /* Also, this shouldn't be called in a parallel worker */
     853             857 :     Assert(!WORKER(state));
     854                 : 
     855                 :     /* Parallel leader allows but ignores hint */
     856 CBC         857 :     if (LEADER(state))
     857 UIC           0 :         return;
     858                 : 
     859                 : #ifdef DEBUG_BOUNDED_SORT
     860                 :     /* Honor GUC setting that disables the feature (for easy testing) */
     861                 :     if (!optimize_bounded_sort)
     862 ECB             :         return;
     863                 : #endif
     864                 : 
     865                 :     /* We want to be able to compute bound * 2, so limit the setting */
     866 CBC         857 :     if (bound > (int64) (INT_MAX / 2))
     867 LBC           0 :         return;
     868 ECB             : 
     869 CBC         857 :     state->bounded = true;
     870 GIC         857 :     state->bound = (int) bound;
     871 ECB             : 
     872                 :     /*
     873                 :      * Bounded sorts are not an effective target for abbreviated key
     874                 :      * optimization.  Disable by setting state to be consistent with no
     875                 :      * abbreviation support.
     876                 :      */
     877 GNC         857 :     state->base.sortKeys->abbrev_converter = NULL;
     878             857 :     if (state->base.sortKeys->abbrev_full_comparator)
     879              35 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     880                 : 
     881                 :     /* Not strictly necessary, but be tidy */
     882             857 :     state->base.sortKeys->abbrev_abort = NULL;
     883             857 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     884 EUB             : }
     885                 : 
     886 ECB             : /*
     887                 :  * tuplesort_used_bound
     888                 :  *
     889                 :  * Allow callers to find out if the sort state was able to use a bound.
     890                 :  */
     891 EUB             : bool
     892 GIC          51 : tuplesort_used_bound(Tuplesortstate *state)
     893                 : {
     894              51 :     return state->boundUsed;
     895                 : }
     896                 : 
     897 ECB             : /*
     898                 :  * tuplesort_free
     899                 :  *
     900                 :  *  Internal routine for freeing resources of tuplesort.
     901                 :  */
     902                 : static void
     903 CBC      190945 : tuplesort_free(Tuplesortstate *state)
     904                 : {
     905 ECB             :     /* context swap probably not needed, but let's be safe */
     906 GNC      190945 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     907                 : 
     908                 : #ifdef TRACE_SORT
     909                 :     long        spaceUsed;
     910 ECB             : 
     911 CBC      190945 :     if (state->tapeset)
     912             334 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     913                 :     else
     914 GIC      190611 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     915                 : #endif
     916                 : 
     917                 :     /*
     918 ECB             :      * Delete temporary "tape" files, if any.
     919                 :      *
     920                 :      * Note: want to include this in reported total cost of sort, hence need
     921                 :      * for two #ifdef TRACE_SORT sections.
     922                 :      *
     923                 :      * We don't bother to destroy the individual tapes here. They will go away
     924                 :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     925                 :      * finished tapes already.)
     926                 :      */
     927 GBC      190945 :     if (state->tapeset)
     928 GIC         334 :         LogicalTapeSetClose(state->tapeset);
     929 ECB             : 
     930                 : #ifdef TRACE_SORT
     931 CBC      190945 :     if (trace_sort)
     932                 :     {
     933 UIC           0 :         if (state->tapeset)
     934               0 :             elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
     935                 :                  SERIAL(state) ? "external sort" : "parallel external sort",
     936                 :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     937                 :         else
     938 LBC           0 :             elog(LOG, "%s of worker %d ended, %ld KB used: %s",
     939                 :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     940 ECB             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     941                 :     }
     942                 : 
     943                 :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     944                 : #else
     945                 : 
     946                 :     /*
     947                 :      * If you disabled TRACE_SORT, you can still probe sort__done, but you
     948                 :      * ain't getting space-used stats.
     949                 :      */
     950                 :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
     951                 : #endif
     952                 : 
     953 GNC      190945 :     FREESTATE(state);
     954 CBC      190945 :     MemoryContextSwitchTo(oldcontext);
     955                 : 
     956 ECB             :     /*
     957 EUB             :      * Free the per-sort memory context, thereby releasing all working memory.
     958 ECB             :      */
     959 GNC      190945 :     MemoryContextReset(state->base.sortcontext);
     960 CBC      190945 : }
     961                 : 
     962                 : /*
     963                 :  * tuplesort_end
     964                 :  *
     965                 :  *  Release resources and clean up.
     966                 :  *
     967                 :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     968 ECB             :  * pointing to garbage.  Be careful not to attempt to use or free such
     969                 :  * pointers afterwards!
     970                 :  */
     971 EUB             : void
     972 CBC      189725 : tuplesort_end(Tuplesortstate *state)
     973 EUB             : {
     974 CBC      189725 :     tuplesort_free(state);
     975                 : 
     976                 :     /*
     977                 :      * Free the main memory context, including the Tuplesortstate struct
     978                 :      * itself.
     979 ECB             :      */
     980 GNC      189725 :     MemoryContextDelete(state->base.maincontext);
     981 CBC      189725 : }
     982                 : 
     983                 : /*
     984                 :  * tuplesort_updatemax
     985                 :  *
     986                 :  *  Update maximum resource usage statistics.
     987                 :  */
     988                 : static void
     989 GIC        1400 : tuplesort_updatemax(Tuplesortstate *state)
     990 ECB             : {
     991                 :     int64       spaceUsed;
     992                 :     bool        isSpaceDisk;
     993 EUB             : 
     994                 :     /*
     995                 :      * Note: it might seem we should provide both memory and disk usage for a
     996 ECB             :      * disk-based sort.  However, the current code doesn't track memory space
     997                 :      * accurately once we have begun to return tuples to the caller (since we
     998                 :      * don't account for pfree's the caller is expected to do), so we cannot
     999                 :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    1000                 :      * to fix.  Is it worth creating an API for the memory context code to
    1001                 :      * tell us how much is actually used in sortcontext?
    1002                 :      */
    1003 CBC        1400 :     if (state->tapeset)
    1004                 :     {
    1005 LBC           0 :         isSpaceDisk = true;
    1006 UBC           0 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
    1007 ECB             :     }
    1008                 :     else
    1009                 :     {
    1010 GIC        1400 :         isSpaceDisk = false;
    1011            1400 :         spaceUsed = state->allowedMem - state->availMem;
    1012                 :     }
    1013 ECB             : 
    1014                 :     /*
    1015                 :      * Sort evicts data to the disk when it wasn't able to fit that data into
    1016                 :      * main memory.  This is why we assume space used on the disk to be more
    1017                 :      * important for tracking resource usage than space used in memory. Note
    1018                 :      * that the amount of space occupied by some tupleset on the disk might be
    1019                 :      * less than amount of space occupied by the same tupleset in memory due
    1020                 :      * to more compact representation.
    1021                 :      */
    1022 GIC        1400 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
    1023            1400 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
    1024                 :     {
    1025             188 :         state->maxSpace = spaceUsed;
    1026 CBC         188 :         state->isMaxSpaceDisk = isSpaceDisk;
    1027 GIC         188 :         state->maxSpaceStatus = state->status;
    1028 ECB             :     }
    1029 CBC        1400 : }
    1030                 : 
    1031                 : /*
    1032                 :  * tuplesort_reset
    1033                 :  *
    1034                 :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
    1035 ECB             :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
    1036                 :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
    1037                 :  *  save resources) when sorting multiple small batches.
    1038                 :  */
    1039                 : void
    1040 GIC        1220 : tuplesort_reset(Tuplesortstate *state)
    1041 ECB             : {
    1042 GIC        1220 :     tuplesort_updatemax(state);
    1043            1220 :     tuplesort_free(state);
    1044                 : 
    1045                 :     /*
    1046                 :      * After we've freed up per-batch memory, re-setup all of the state common
    1047 ECB             :      * to both the first batch and any subsequent batch.
    1048                 :      */
    1049 GIC        1220 :     tuplesort_begin_batch(state);
    1050                 : 
    1051            1220 :     state->lastReturnedTuple = NULL;
    1052            1220 :     state->slabMemoryBegin = NULL;
    1053 CBC        1220 :     state->slabMemoryEnd = NULL;
    1054 GIC        1220 :     state->slabFreeHead = NULL;
    1055            1220 : }
    1056                 : 
    1057                 : /*
    1058                 :  * Grow the memtuples[] array, if possible within our memory constraint.  We
    1059 ECB             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
    1060                 :  * limit.  Return true if we were able to enlarge the array, false if not.
    1061                 :  *
    1062                 :  * Normally, at each increment we double the size of the array.  When doing
    1063                 :  * that would exceed a limit, we attempt one last, smaller increase (and then
    1064                 :  * clear the growmemtuples flag so we don't try any more).  That allows us to
    1065                 :  * use memory as fully as permitted; sticking to the pure doubling rule could
    1066                 :  * result in almost half going unused.  Because availMem moves around with
    1067                 :  * tuple addition/removal, we need some rule to prevent making repeated small
    1068                 :  * increases in memtupsize, which would just be useless thrashing.  The
    1069                 :  * growmemtuples flag accomplishes that and also prevents useless
    1070                 :  * recalculations in this function.
    1071                 :  */
    1072                 : static bool
    1073 CBC        5244 : grow_memtuples(Tuplesortstate *state)
    1074                 : {
    1075 EUB             :     int         newmemtupsize;
    1076 GBC        5244 :     int         memtupsize = state->memtupsize;
    1077 GIC        5244 :     int64       memNowUsed = state->allowedMem - state->availMem;
    1078                 : 
    1079                 :     /* Forget it if we've already maxed out memtuples, per comment above */
    1080            5244 :     if (!state->growmemtuples)
    1081              40 :         return false;
    1082                 : 
    1083                 :     /* Select new value of memtupsize */
    1084            5204 :     if (memNowUsed <= state->availMem)
    1085                 :     {
    1086                 :         /*
    1087                 :          * We've used no more than half of allowedMem; double our usage,
    1088 ECB             :          * clamping at INT_MAX tuples.
    1089                 :          */
    1090 GIC        5154 :         if (memtupsize < INT_MAX / 2)
    1091            5154 :             newmemtupsize = memtupsize * 2;
    1092                 :         else
    1093                 :         {
    1094 UIC           0 :             newmemtupsize = INT_MAX;
    1095               0 :             state->growmemtuples = false;
    1096 ECB             :         }
    1097                 :     }
    1098                 :     else
    1099                 :     {
    1100                 :         /*
    1101                 :          * This will be the last increment of memtupsize.  Abandon doubling
    1102                 :          * strategy and instead increase as much as we safely can.
    1103                 :          *
    1104                 :          * To stay within allowedMem, we can't increase memtupsize by more
    1105                 :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
    1106                 :          * to increase it by considerably less, because we need to leave some
    1107                 :          * space for the tuples to which the new array slots will refer.  We
    1108 EUB             :          * assume the new tuples will be about the same size as the tuples
    1109                 :          * we've already seen, and thus we can extrapolate from the space
    1110                 :          * consumption so far to estimate an appropriate new size for the
    1111                 :          * memtuples array.  The optimal value might be higher or lower than
    1112                 :          * this estimate, but it's hard to know that in advance.  We again
    1113                 :          * clamp at INT_MAX tuples.
    1114                 :          *
    1115                 :          * This calculation is safe against enlarging the array so much that
    1116                 :          * LACKMEM becomes true, because the memory currently used includes
    1117                 :          * the present array; thus, there would be enough allowedMem for the
    1118                 :          * new array elements even if no other memory were currently used.
    1119                 :          *
    1120                 :          * We do the arithmetic in float8, because otherwise the product of
    1121 ECB             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
    1122                 :          * result should be insignificant; but even if we computed a
    1123                 :          * completely insane result, the checks below will prevent anything
    1124                 :          * really bad from happening.
    1125                 :          */
    1126                 :         double      grow_ratio;
    1127                 : 
    1128 CBC          50 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1129              50 :         if (memtupsize * grow_ratio < INT_MAX)
    1130 GIC          50 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1131                 :         else
    1132 UIC           0 :             newmemtupsize = INT_MAX;
    1133 ECB             : 
    1134                 :         /* We won't make any further enlargement attempts */
    1135 GBC          50 :         state->growmemtuples = false;
    1136 EUB             :     }
    1137                 : 
    1138 ECB             :     /* Must enlarge array by at least one element, else report failure */
    1139 GIC        5204 :     if (newmemtupsize <= memtupsize)
    1140 LBC           0 :         goto noalloc;
    1141 ECB             : 
    1142                 :     /*
    1143 EUB             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1144                 :      * to ensure our request won't be rejected.  Note that we can easily
    1145                 :      * exhaust address space before facing this outcome.  (This is presently
    1146                 :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1147                 :      * don't rely on that at this distance.)
    1148                 :      */
    1149 GIC        5204 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1150                 :     {
    1151 UIC           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1152               0 :         state->growmemtuples = false;    /* can't grow any more */
    1153                 :     }
    1154                 : 
    1155                 :     /*
    1156 ECB             :      * We need to be sure that we do not cause LACKMEM to become true, else
    1157                 :      * the space management algorithm will go nuts.  The code above should
    1158                 :      * never generate a dangerous request, but to be safe, check explicitly
    1159                 :      * that the array growth fits within availMem.  (We could still cause
    1160                 :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1161                 :      * array were to increase.  That shouldn't happen because we chose the
    1162                 :      * initial array size large enough to ensure that palloc will be treating
    1163                 :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1164                 :      * explicitly below just in case.)
    1165                 :      */
    1166 GIC        5204 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1167 UIC           0 :         goto noalloc;
    1168                 : 
    1169                 :     /* OK, do it */
    1170 GIC        5204 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1171            5204 :     state->memtupsize = newmemtupsize;
    1172            5204 :     state->memtuples = (SortTuple *)
    1173            5204 :         repalloc_huge(state->memtuples,
    1174            5204 :                       state->memtupsize * sizeof(SortTuple));
    1175            5204 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1176            5204 :     if (LACKMEM(state))
    1177 UIC           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1178 GIC        5204 :     return true;
    1179                 : 
    1180 UIC           0 : noalloc:
    1181 ECB             :     /* If for any reason we didn't realloc, shut off future attempts */
    1182 UIC           0 :     state->growmemtuples = false;
    1183               0 :     return false;
    1184                 : }
    1185                 : 
    1186                 : /*
    1187                 :  * Shared code for tuple and datum cases.
    1188                 :  */
    1189                 : void
    1190 GNC    19235664 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev)
    1191                 : {
    1192        19235664 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1193 ECB             : 
    1194 GNC    19235664 :     Assert(!LEADER(state));
    1195                 : 
    1196                 :     /* Count the size of the out-of-line data */
    1197        19235664 :     if (tuple->tuple != NULL)
    1198        18327022 :         USEMEM(state, GetMemoryChunkSpace(tuple->tuple));
    1199                 : 
    1200        19235664 :     if (!useAbbrev)
    1201                 :     {
    1202                 :         /*
    1203                 :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1204 EUB             :          * converter it won't expect NULL values, and cost model is not
    1205                 :          * required to account for NULL, so in that case we avoid calling
    1206                 :          * converter and just set datum1 to zeroed representation (to be
    1207                 :          * consistent, and to support cheap inequality tests for NULL
    1208                 :          * abbreviated keys).
    1209 ECB             :          */
    1210                 :     }
    1211 CBC     2367567 :     else if (!consider_abort_common(state))
    1212                 :     {
    1213                 :         /* Store abbreviated key representation */
    1214 GNC     2367490 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1215                 :                                                                state->base.sortKeys);
    1216                 :     }
    1217                 :     else
    1218                 :     {
    1219 ECB             :         /*
    1220                 :          * Set state to be consistent with never trying abbreviation.
    1221                 :          *
    1222                 :          * Alter datum1 representation in already-copied tuples, so as to
    1223                 :          * ensure a consistent representation (current tuple was just
    1224                 :          * handled).  It does not matter if some dumped tuples are already
    1225                 :          * sorted on tape, since serialized tuples lack abbreviated keys
    1226                 :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1227                 :          */
    1228 GNC          77 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1229                 :     }
    1230                 : 
    1231 GIC    19235664 :     switch (state->status)
    1232 ECB             :     {
    1233 CBC    16781339 :         case TSS_INITIAL:
    1234                 : 
    1235 ECB             :             /*
    1236                 :              * Save the tuple into the unsorted array.  First, grow the array
    1237                 :              * as needed.  Note that we try to grow the array when there is
    1238                 :              * still one free slot remaining --- if we fail, there'll still be
    1239                 :              * room to store the incoming tuple, and then we'll switch to
    1240                 :              * tape-based operation.
    1241                 :              */
    1242 GIC    16781339 :             if (state->memtupcount >= state->memtupsize - 1)
    1243 ECB             :             {
    1244 CBC        5244 :                 (void) grow_memtuples(state);
    1245 GIC        5244 :                 Assert(state->memtupcount < state->memtupsize);
    1246                 :             }
    1247 CBC    16781339 :             state->memtuples[state->memtupcount++] = *tuple;
    1248 ECB             : 
    1249                 :             /*
    1250                 :              * Check if it's time to switch over to a bounded heapsort. We do
    1251                 :              * so if the input tuple count exceeds twice the desired tuple
    1252                 :              * count (this is a heuristic for where heapsort becomes cheaper
    1253                 :              * than a quicksort), or if we've just filled workMem and have
    1254                 :              * enough tuples to meet the bound.
    1255                 :              *
    1256                 :              * Note that once we enter TSS_BOUNDED state we will always try to
    1257                 :              * complete the sort that way.  In the worst case, if later input
    1258                 :              * tuples are larger than earlier ones, this might cause us to
    1259                 :              * exceed workMem significantly.
    1260                 :              */
    1261 CBC    16781339 :             if (state->bounded &&
    1262           21874 :                 (state->memtupcount > state->bound * 2 ||
    1263           21674 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1264                 :             {
    1265                 : #ifdef TRACE_SORT
    1266 GIC         200 :                 if (trace_sort)
    1267 UIC           0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1268                 :                          state->memtupcount,
    1269                 :                          pg_rusage_show(&state->ru_start));
    1270                 : #endif
    1271 GIC         200 :                 make_bounded_heap(state);
    1272 GNC         200 :                 MemoryContextSwitchTo(oldcontext);
    1273 GIC         200 :                 return;
    1274                 :             }
    1275                 : 
    1276                 :             /*
    1277                 :              * Done if we still fit in available memory and have array slots.
    1278                 :              */
    1279 CBC    16781139 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1280                 :             {
    1281 GNC    16781081 :                 MemoryContextSwitchTo(oldcontext);
    1282 CBC    16781081 :                 return;
    1283                 :             }
    1284                 : 
    1285 ECB             :             /*
    1286                 :              * Nope; time to switch to tape-based operation.
    1287                 :              */
    1288 GIC          58 :             inittapes(state, true);
    1289                 : 
    1290                 :             /*
    1291                 :              * Dump all tuples.
    1292                 :              */
    1293              58 :             dumptuples(state, false);
    1294              58 :             break;
    1295 ECB             : 
    1296 CBC     1868011 :         case TSS_BOUNDED:
    1297 ECB             : 
    1298                 :             /*
    1299                 :              * We don't want to grow the array here, so check whether the new
    1300                 :              * tuple can be discarded before putting it in.  This should be a
    1301                 :              * good speed optimization, too, since when there are many more
    1302                 :              * input tuples than the bound, most input tuples can be discarded
    1303                 :              * with just this one comparison.  Note that because we currently
    1304                 :              * have the sort direction reversed, we must check for <= not >=.
    1305                 :              */
    1306 CBC     1868011 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1307                 :             {
    1308 ECB             :                 /* new tuple <= top of the heap, so we can discard it */
    1309 GBC     1616559 :                 free_sort_tuple(state, tuple);
    1310 GIC     1616559 :                 CHECK_FOR_INTERRUPTS();
    1311                 :             }
    1312                 :             else
    1313                 :             {
    1314                 :                 /* discard top of heap, replacing it with the new tuple */
    1315          251452 :                 free_sort_tuple(state, &state->memtuples[0]);
    1316          251452 :                 tuplesort_heap_replace_top(state, tuple);
    1317                 :             }
    1318         1868011 :             break;
    1319                 : 
    1320          586314 :         case TSS_BUILDRUNS:
    1321 ECB             : 
    1322                 :             /*
    1323                 :              * Save the tuple into the unsorted array (there must be space)
    1324                 :              */
    1325 GIC      586314 :             state->memtuples[state->memtupcount++] = *tuple;
    1326 ECB             : 
    1327                 :             /*
    1328                 :              * If we are over the memory limit, dump all tuples.
    1329                 :              */
    1330 CBC      586314 :             dumptuples(state, false);
    1331 GIC      586314 :             break;
    1332                 : 
    1333 UIC           0 :         default:
    1334 LBC           0 :             elog(ERROR, "invalid tuplesort state");
    1335 ECB             :             break;
    1336                 :     }
    1337 GNC     2454383 :     MemoryContextSwitchTo(oldcontext);
    1338                 : }
    1339                 : 
    1340                 : static bool
    1341 GIC     2367567 : consider_abort_common(Tuplesortstate *state)
    1342                 : {
    1343 GNC     2367567 :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1344         2367567 :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1345         2367567 :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1346 ECB             : 
    1347                 :     /*
    1348                 :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1349                 :      * when still within memory limit.
    1350                 :      */
    1351 GIC     2367567 :     if (state->status == TSS_INITIAL &&
    1352 CBC     2100047 :         state->memtupcount >= state->abbrevNext)
    1353                 :     {
    1354 GIC        3163 :         state->abbrevNext *= 2;
    1355                 : 
    1356                 :         /*
    1357                 :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1358 ECB             :          * that abbreviation should not proceed.
    1359 EUB             :          */
    1360 GNC        3163 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1361                 :                                                 state->base.sortKeys))
    1362 GIC        3086 :             return false;
    1363                 : 
    1364                 :         /*
    1365 ECB             :          * Finally, restore authoritative comparator, and indicate that
    1366                 :          * abbreviation is not in play by setting abbrev_converter to NULL
    1367                 :          */
    1368 GNC          77 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1369              77 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1370                 :         /* Not strictly necessary, but be tidy */
    1371              77 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1372              77 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1373                 : 
    1374 ECB             :         /* Give up - expect original pass-by-value representation */
    1375 CBC          77 :         return true;
    1376 ECB             :     }
    1377                 : 
    1378 GIC     2364404 :     return false;
    1379 ECB             : }
    1380                 : 
    1381                 : /*
    1382                 :  * All tuples have been provided; finish the sort.
    1383                 :  */
    1384                 : void
    1385 GIC      134315 : tuplesort_performsort(Tuplesortstate *state)
    1386                 : {
    1387 GNC      134315 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1388 ECB             : 
    1389                 : #ifdef TRACE_SORT
    1390 GIC      134315 :     if (trace_sort)
    1391 LBC           0 :         elog(LOG, "performsort of worker %d starting: %s",
    1392                 :              state->worker, pg_rusage_show(&state->ru_start));
    1393                 : #endif
    1394                 : 
    1395 GIC      134315 :     switch (state->status)
    1396                 :     {
    1397 CBC      134057 :         case TSS_INITIAL:
    1398 ECB             : 
    1399                 :             /*
    1400                 :              * We were able to accumulate all the tuples within the allowed
    1401                 :              * amount of memory, or leader to take over worker tapes
    1402                 :              */
    1403 GIC      134057 :             if (SERIAL(state))
    1404 ECB             :             {
    1405                 :                 /* Just qsort 'em and we're done */
    1406 CBC      133781 :                 tuplesort_sort_memtuples(state);
    1407 GIC      133739 :                 state->status = TSS_SORTEDINMEM;
    1408 ECB             :             }
    1409 CBC         276 :             else if (WORKER(state))
    1410                 :             {
    1411                 :                 /*
    1412 ECB             :                  * Parallel workers must still dump out tuples to tape.  No
    1413                 :                  * merge is required to produce single output run, though.
    1414                 :                  */
    1415 GIC         205 :                 inittapes(state, false);
    1416             205 :                 dumptuples(state, true);
    1417             205 :                 worker_nomergeruns(state);
    1418             205 :                 state->status = TSS_SORTEDONTAPE;
    1419                 :             }
    1420 ECB             :             else
    1421                 :             {
    1422                 :                 /*
    1423                 :                  * Leader will take over worker tapes and merge worker runs.
    1424                 :                  * Note that mergeruns sets the correct state->status.
    1425                 :                  */
    1426 GIC          71 :                 leader_takeover_tapes(state);
    1427              71 :                 mergeruns(state);
    1428                 :             }
    1429 CBC      134015 :             state->current = 0;
    1430 GIC      134015 :             state->eof_reached = false;
    1431 CBC      134015 :             state->markpos_block = 0L;
    1432 GIC      134015 :             state->markpos_offset = 0;
    1433          134015 :             state->markpos_eof = false;
    1434          134015 :             break;
    1435                 : 
    1436             200 :         case TSS_BOUNDED:
    1437                 : 
    1438 ECB             :             /*
    1439                 :              * We were able to accumulate all the tuples required for output
    1440                 :              * in memory, using a heap to eliminate excess tuples.  Now we
    1441                 :              * have to transform the heap to a properly-sorted array.
    1442                 :              * Note that sort_bounded_heap sets the correct state->status.
    1443                 :              */
    1444 CBC         200 :             sort_bounded_heap(state);
    1445             200 :             state->current = 0;
    1446             200 :             state->eof_reached = false;
    1447 GIC         200 :             state->markpos_offset = 0;
    1448             200 :             state->markpos_eof = false;
    1449 CBC         200 :             break;
    1450                 : 
    1451 GIC          58 :         case TSS_BUILDRUNS:
    1452                 : 
    1453                 :             /*
    1454                 :              * Finish tape-based sort.  First, flush all tuples remaining in
    1455 ECB             :              * memory out to tape; then merge until we have a single remaining
    1456                 :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1457                 :              * Note that mergeruns sets the correct state->status.
    1458                 :              */
    1459 GIC          58 :             dumptuples(state, true);
    1460              58 :             mergeruns(state);
    1461              58 :             state->eof_reached = false;
    1462 CBC          58 :             state->markpos_block = 0L;
    1463              58 :             state->markpos_offset = 0;
    1464 GIC          58 :             state->markpos_eof = false;
    1465              58 :             break;
    1466                 : 
    1467 UIC           0 :         default:
    1468               0 :             elog(ERROR, "invalid tuplesort state");
    1469                 :             break;
    1470                 :     }
    1471 ECB             : 
    1472                 : #ifdef TRACE_SORT
    1473 GIC      134273 :     if (trace_sort)
    1474                 :     {
    1475 UIC           0 :         if (state->status == TSS_FINALMERGE)
    1476               0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1477                 :                  state->worker, state->nInputTapes,
    1478                 :                  pg_rusage_show(&state->ru_start));
    1479                 :         else
    1480 LBC           0 :             elog(LOG, "performsort of worker %d done: %s",
    1481                 :                  state->worker, pg_rusage_show(&state->ru_start));
    1482                 :     }
    1483                 : #endif
    1484                 : 
    1485 GIC      134273 :     MemoryContextSwitchTo(oldcontext);
    1486 CBC      134273 : }
    1487                 : 
    1488 ECB             : /*
    1489                 :  * Internal routine to fetch the next tuple in either forward or back
    1490                 :  * direction into *stup.  Returns false if no more tuples.
    1491                 :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1492                 :  * by caller.  Note that fetched tuple is stored in memory that may be
    1493                 :  * recycled by any future fetch.
    1494                 :  */
    1495                 : bool
    1496 CBC    17912524 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1497 ECB             :                           SortTuple *stup)
    1498                 : {
    1499                 :     unsigned int tuplen;
    1500                 :     size_t      nmoved;
    1501                 : 
    1502 GIC    17912524 :     Assert(!WORKER(state));
    1503                 : 
    1504        17912524 :     switch (state->status)
    1505                 :     {
    1506        15498267 :         case TSS_SORTEDINMEM:
    1507 GNC    15498267 :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1508 CBC    15498267 :             Assert(!state->slabAllocatorUsed);
    1509 GIC    15498267 :             if (forward)
    1510                 :             {
    1511        15498234 :                 if (state->current < state->memtupcount)
    1512                 :                 {
    1513 CBC    15365297 :                     *stup = state->memtuples[state->current++];
    1514        15365297 :                     return true;
    1515 ECB             :                 }
    1516 GIC      132937 :                 state->eof_reached = true;
    1517 ECB             : 
    1518                 :                 /*
    1519                 :                  * Complain if caller tries to retrieve more tuples than
    1520                 :                  * originally asked for in a bounded sort.  This is because
    1521                 :                  * returning EOF here might be the wrong thing.
    1522                 :                  */
    1523 GIC      132937 :                 if (state->bounded && state->current >= state->bound)
    1524 UIC           0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1525                 : 
    1526 GIC      132937 :                 return false;
    1527 ECB             :             }
    1528                 :             else
    1529                 :             {
    1530 GIC          33 :                 if (state->current <= 0)
    1531 UIC           0 :                     return false;
    1532                 : 
    1533                 :                 /*
    1534                 :                  * if all tuples are fetched already then we return last
    1535                 :                  * tuple, else - tuple before last returned.
    1536 ECB             :                  */
    1537 CBC          33 :                 if (state->eof_reached)
    1538               6 :                     state->eof_reached = false;
    1539                 :                 else
    1540                 :                 {
    1541 GIC          27 :                     state->current--;    /* last returned tuple */
    1542              27 :                     if (state->current <= 0)
    1543               3 :                         return false;
    1544                 :                 }
    1545              30 :                 *stup = state->memtuples[state->current - 1];
    1546              30 :                 return true;
    1547                 :             }
    1548 ECB             :             break;
    1549 EUB             : 
    1550 GIC      136497 :         case TSS_SORTEDONTAPE:
    1551 GNC      136497 :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1552 GIC      136497 :             Assert(state->slabAllocatorUsed);
    1553                 : 
    1554                 :             /*
    1555                 :              * The slot that held the tuple that we returned in previous
    1556                 :              * gettuple call can now be reused.
    1557 ECB             :              */
    1558 GBC      136497 :             if (state->lastReturnedTuple)
    1559                 :             {
    1560 GIC       76425 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1561           76425 :                 state->lastReturnedTuple = NULL;
    1562                 :             }
    1563 ECB             : 
    1564 CBC      136497 :             if (forward)
    1565                 :             {
    1566          136482 :                 if (state->eof_reached)
    1567 UIC           0 :                     return false;
    1568                 : 
    1569 CBC      136482 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1570 EUB             :                 {
    1571 GIC      136470 :                     READTUP(state, stup, state->result_tape, tuplen);
    1572                 : 
    1573                 :                     /*
    1574                 :                      * Remember the tuple we return, so that we can recycle
    1575                 :                      * its memory on next call.  (This can be NULL, in the
    1576                 :                      * !state->tuples case).
    1577                 :                      */
    1578          136470 :                     state->lastReturnedTuple = stup->tuple;
    1579 ECB             : 
    1580 GIC      136470 :                     return true;
    1581                 :                 }
    1582 ECB             :                 else
    1583 EUB             :                 {
    1584 GIC          12 :                     state->eof_reached = true;
    1585              12 :                     return false;
    1586                 :                 }
    1587                 :             }
    1588 ECB             : 
    1589                 :             /*
    1590                 :              * Backward.
    1591                 :              *
    1592                 :              * if all tuples are fetched already then we return last tuple,
    1593                 :              * else - tuple before last returned.
    1594                 :              */
    1595 GIC          15 :             if (state->eof_reached)
    1596                 :             {
    1597                 :                 /*
    1598                 :                  * Seek position is pointing just past the zero tuplen at the
    1599 ECB             :                  * end of file; back up to fetch last tuple's ending length
    1600                 :                  * word.  If seek fails we must have a completely empty file.
    1601                 :                  */
    1602 GIC           6 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1603 ECB             :                                               2 * sizeof(unsigned int));
    1604 GIC           6 :                 if (nmoved == 0)
    1605 UIC           0 :                     return false;
    1606 GIC           6 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1607 UIC           0 :                     elog(ERROR, "unexpected tape position");
    1608 GIC           6 :                 state->eof_reached = false;
    1609                 :             }
    1610                 :             else
    1611                 :             {
    1612 ECB             :                 /*
    1613                 :                  * Back up and fetch previously-returned tuple's ending length
    1614                 :                  * word.  If seek fails, assume we are at start of file.
    1615                 :                  */
    1616 GIC           9 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1617 ECB             :                                               sizeof(unsigned int));
    1618 GBC           9 :                 if (nmoved == 0)
    1619 UIC           0 :                     return false;
    1620 GIC           9 :                 else if (nmoved != sizeof(unsigned int))
    1621 UIC           0 :                     elog(ERROR, "unexpected tape position");
    1622 GIC           9 :                 tuplen = getlen(state->result_tape, false);
    1623                 : 
    1624                 :                 /*
    1625                 :                  * Back up to get ending length word of tuple before it.
    1626                 :                  */
    1627               9 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1628 ECB             :                                               tuplen + 2 * sizeof(unsigned int));
    1629 GIC           9 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1630 ECB             :                 {
    1631                 :                     /*
    1632                 :                      * We backed up over the previous tuple, but there was no
    1633                 :                      * ending length word before it.  That means that the prev
    1634                 :                      * tuple is the first tuple in the file.  It is now the
    1635                 :                      * next to read in forward direction (not obviously right,
    1636                 :                      * but that is what in-memory case does).
    1637                 :                      */
    1638 CBC           3 :                     return false;
    1639 ECB             :                 }
    1640 CBC           6 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1641 LBC           0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1642 ECB             :             }
    1643                 : 
    1644 CBC          12 :             tuplen = getlen(state->result_tape, false);
    1645 ECB             : 
    1646                 :             /*
    1647                 :              * Now we have the length of the prior tuple, back up and read it.
    1648                 :              * Note: READTUP expects we are positioned after the initial
    1649 EUB             :              * length word of the tuple, so back up to that point.
    1650                 :              */
    1651 GIC          12 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1652                 :                                           tuplen);
    1653              12 :             if (nmoved != tuplen)
    1654 LBC           0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1655 CBC          12 :             READTUP(state, stup, state->result_tape, tuplen);
    1656                 : 
    1657                 :             /*
    1658                 :              * Remember the tuple we return, so that we can recycle its memory
    1659                 :              * on next call. (This can be NULL, in the Datum case).
    1660                 :              */
    1661              12 :             state->lastReturnedTuple = stup->tuple;
    1662                 : 
    1663              12 :             return true;
    1664                 : 
    1665         2277760 :         case TSS_FINALMERGE:
    1666 GIC     2277760 :             Assert(forward);
    1667 ECB             :             /* We are managing memory ourselves, with the slab allocator. */
    1668 GIC     2277760 :             Assert(state->slabAllocatorUsed);
    1669 ECB             : 
    1670                 :             /*
    1671                 :              * The slab slot holding the tuple that we returned in previous
    1672                 :              * gettuple call can now be reused.
    1673                 :              */
    1674 CBC     2277760 :             if (state->lastReturnedTuple)
    1675                 :             {
    1676 GIC     2247626 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1677 CBC     2247626 :                 state->lastReturnedTuple = NULL;
    1678 ECB             :             }
    1679 EUB             : 
    1680                 :             /*
    1681                 :              * This code should match the inner loop of mergeonerun().
    1682                 :              */
    1683 GIC     2277760 :             if (state->memtupcount > 0)
    1684 ECB             :             {
    1685 CBC     2277650 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1686 GIC     2277650 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1687                 :                 SortTuple   newtup;
    1688                 : 
    1689         2277650 :                 *stup = state->memtuples[0];
    1690                 : 
    1691                 :                 /*
    1692 ECB             :                  * Remember the tuple we return, so that we can recycle its
    1693                 :                  * memory on next call. (This can be NULL, in the Datum case).
    1694                 :                  */
    1695 GIC     2277650 :                 state->lastReturnedTuple = stup->tuple;
    1696 ECB             : 
    1697                 :                 /*
    1698                 :                  * Pull next tuple from tape, and replace the returned tuple
    1699                 :                  * at top of the heap with it.
    1700                 :                  */
    1701 CBC     2277650 :                 if (!mergereadnext(state, srcTape, &newtup))
    1702 ECB             :                 {
    1703                 :                     /*
    1704                 :                      * If no more data, we've reached end of run on this tape.
    1705                 :                      * Remove the top node from the heap.
    1706                 :                      */
    1707 GIC         151 :                     tuplesort_heap_delete_top(state);
    1708 CBC         151 :                     state->nInputRuns--;
    1709 ECB             : 
    1710 EUB             :                     /*
    1711                 :                      * Close the tape.  It'd go away at the end of the sort
    1712                 :                      * anyway, but better to release the memory early.
    1713                 :                      */
    1714 GIC         151 :                     LogicalTapeClose(srcTape);
    1715 CBC         151 :                     return true;
    1716 ECB             :                 }
    1717 GIC     2277499 :                 newtup.srctape = srcTapeIndex;
    1718         2277499 :                 tuplesort_heap_replace_top(state, &newtup);
    1719         2277499 :                 return true;
    1720                 :             }
    1721             110 :             return false;
    1722                 : 
    1723 UIC           0 :         default:
    1724               0 :             elog(ERROR, "invalid tuplesort state");
    1725 ECB             :             return false;       /* keep compiler quiet */
    1726                 :     }
    1727                 : }
    1728                 : 
    1729                 : 
    1730                 : /*
    1731                 :  * Advance over N tuples in either forward or back direction,
    1732                 :  * without returning any data.  N==0 is a no-op.
    1733                 :  * Returns true if successful, false if ran out of tuples.
    1734                 :  */
    1735                 : bool
    1736 CBC         196 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1737                 : {
    1738 ECB             :     MemoryContext oldcontext;
    1739                 : 
    1740                 :     /*
    1741                 :      * We don't actually support backwards skip yet, because no callers need
    1742                 :      * it.  The API is designed to allow for that later, though.
    1743                 :      */
    1744 CBC         196 :     Assert(forward);
    1745 GIC         196 :     Assert(ntuples >= 0);
    1746             196 :     Assert(!WORKER(state));
    1747                 : 
    1748             196 :     switch (state->status)
    1749                 :     {
    1750 CBC         184 :         case TSS_SORTEDINMEM:
    1751 GIC         184 :             if (state->memtupcount - state->current >= ntuples)
    1752 ECB             :             {
    1753 CBC         184 :                 state->current += ntuples;
    1754             184 :                 return true;
    1755                 :             }
    1756 UIC           0 :             state->current = state->memtupcount;
    1757               0 :             state->eof_reached = true;
    1758                 : 
    1759                 :             /*
    1760                 :              * Complain if caller tries to retrieve more tuples than
    1761                 :              * originally asked for in a bounded sort.  This is because
    1762 ECB             :              * returning EOF here might be the wrong thing.
    1763                 :              */
    1764 LBC           0 :             if (state->bounded && state->current >= state->bound)
    1765 UIC           0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1766 ECB             : 
    1767 UIC           0 :             return false;
    1768                 : 
    1769 GIC          12 :         case TSS_SORTEDONTAPE:
    1770                 :         case TSS_FINALMERGE:
    1771                 : 
    1772 ECB             :             /*
    1773                 :              * We could probably optimize these cases better, but for now it's
    1774                 :              * not worth the trouble.
    1775                 :              */
    1776 GNC          12 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1777 CBC      120066 :             while (ntuples-- > 0)
    1778                 :             {
    1779 ECB             :                 SortTuple   stup;
    1780                 : 
    1781 GIC      120054 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1782 ECB             :                 {
    1783 UIC           0 :                     MemoryContextSwitchTo(oldcontext);
    1784 LBC           0 :                     return false;
    1785 ECB             :                 }
    1786 GIC      120054 :                 CHECK_FOR_INTERRUPTS();
    1787 ECB             :             }
    1788 GIC          12 :             MemoryContextSwitchTo(oldcontext);
    1789              12 :             return true;
    1790 ECB             : 
    1791 UIC           0 :         default:
    1792 LBC           0 :             elog(ERROR, "invalid tuplesort state");
    1793 ECB             :             return false;       /* keep compiler quiet */
    1794                 :     }
    1795                 : }
    1796                 : 
    1797                 : /*
    1798                 :  * tuplesort_merge_order - report merge order we'll use for given memory
    1799                 :  * (note: "merge order" just means the number of input tapes in the merge).
    1800                 :  *
    1801                 :  * This is exported for use by the planner.  allowedMem is in bytes.
    1802                 :  */
    1803                 : int
    1804 GIC        7737 : tuplesort_merge_order(int64 allowedMem)
    1805                 : {
    1806                 :     int         mOrder;
    1807 ECB             : 
    1808                 :     /*----------
    1809                 :      * In the merge phase, we need buffer space for each input and output tape.
    1810                 :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1811                 :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1812                 :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1813                 :      * input tape.
    1814                 :      *
    1815                 :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1816                 :      *            N * TAPE_BUFFER_OVERHEAD
    1817                 :      *
    1818                 :      * Except for the last and next-to-last merge passes, where there can be
    1819                 :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1820                 :      * desired amount of memory available for the input buffers
    1821                 :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1822                 :      * available for the tape buffers (allowedMem).
    1823                 :      *
    1824                 :      * Note: you might be thinking we need to account for the memtuples[]
    1825                 :      * array in this calculation, but we effectively treat that as part of the
    1826                 :      * MERGE_BUFFER_SIZE workspace.
    1827                 :      *----------
    1828                 :      */
    1829 GIC        7737 :     mOrder = allowedMem /
    1830 ECB             :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1831                 : 
    1832                 :     /*
    1833                 :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1834                 :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1835                 :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1836                 :      * additional tape reduces the amount of memory available to build runs,
    1837                 :      * which in turn can cause the same sort to need more runs, which makes
    1838                 :      * merging slower even if it can still be done in a single pass.  Also,
    1839                 :      * high order merges are quite slow due to CPU cache effects; it can be
    1840                 :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1841                 :      * single merge pass across many hundreds of tapes.
    1842                 :      */
    1843 GIC        7737 :     mOrder = Max(mOrder, MINORDER);
    1844 CBC        7737 :     mOrder = Min(mOrder, MAXORDER);
    1845 ECB             : 
    1846 CBC        7737 :     return mOrder;
    1847 ECB             : }
    1848                 : 
    1849                 : /*
    1850                 :  * Helper function to calculate how much memory to allocate for the read buffer
    1851                 :  * of each input tape in a merge pass.
    1852                 :  *
    1853                 :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1854                 :  *      tapes, both input and output.
    1855                 :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1856                 :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1857                 :  */
    1858                 : static int64
    1859 GIC         177 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1860 ECB             :                        int maxOutputTapes)
    1861                 : {
    1862                 :     int         nOutputRuns;
    1863                 :     int         nOutputTapes;
    1864                 : 
    1865                 :     /*
    1866                 :      * How many output tapes will we produce in this pass?
    1867                 :      *
    1868                 :      * This is nInputRuns / nInputTapes, rounded up.
    1869                 :      */
    1870 GIC         177 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1871                 : 
    1872 CBC         177 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1873 ECB             : 
    1874                 :     /*
    1875                 :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1876                 :      * remaining memory is divided evenly between the input tapes.
    1877                 :      *
    1878                 :      * This also follows from the formula in tuplesort_merge_order, but here
    1879                 :      * we derive the input buffer size from the amount of memory available,
    1880                 :      * and M and N.
    1881                 :      */
    1882 GIC         177 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1883                 : }
    1884 ECB             : 
    1885                 : /*
    1886                 :  * inittapes - initialize for tape sorting.
    1887                 :  *
    1888                 :  * This is called only if we have found we won't sort in memory.
    1889                 :  */
    1890                 : static void
    1891 GIC         263 : inittapes(Tuplesortstate *state, bool mergeruns)
    1892 ECB             : {
    1893 GIC         263 :     Assert(!LEADER(state));
    1894                 : 
    1895             263 :     if (mergeruns)
    1896                 :     {
    1897                 :         /* Compute number of input tapes to use when merging */
    1898              58 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1899 ECB             :     }
    1900                 :     else
    1901                 :     {
    1902                 :         /* Workers can sometimes produce single run, output without merge */
    1903 CBC         205 :         Assert(WORKER(state));
    1904 GIC         205 :         state->maxTapes = MINORDER;
    1905 ECB             :     }
    1906                 : 
    1907                 : #ifdef TRACE_SORT
    1908 CBC         263 :     if (trace_sort)
    1909 LBC           0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1910 ECB             :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1911                 : #endif
    1912                 : 
    1913                 :     /* Create the tape set */
    1914 GIC         263 :     inittapestate(state, state->maxTapes);
    1915 CBC         263 :     state->tapeset =
    1916             263 :         LogicalTapeSetCreate(false,
    1917 GIC         263 :                              state->shared ? &state->shared->fileset : NULL,
    1918                 :                              state->worker);
    1919                 : 
    1920             263 :     state->currentRun = 0;
    1921                 : 
    1922                 :     /*
    1923                 :      * Initialize logical tape arrays.
    1924 ECB             :      */
    1925 GIC         263 :     state->inputTapes = NULL;
    1926 CBC         263 :     state->nInputTapes = 0;
    1927 GIC         263 :     state->nInputRuns = 0;
    1928                 : 
    1929 CBC         263 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1930 GIC         263 :     state->nOutputTapes = 0;
    1931 CBC         263 :     state->nOutputRuns = 0;
    1932 ECB             : 
    1933 GIC         263 :     state->status = TSS_BUILDRUNS;
    1934 ECB             : 
    1935 GIC         263 :     selectnewtape(state);
    1936             263 : }
    1937                 : 
    1938                 : /*
    1939                 :  * inittapestate - initialize generic tape management state
    1940                 :  */
    1941                 : static void
    1942 CBC         334 : inittapestate(Tuplesortstate *state, int maxTapes)
    1943                 : {
    1944                 :     int64       tapeSpace;
    1945                 : 
    1946 ECB             :     /*
    1947                 :      * Decrease availMem to reflect the space needed for tape buffers; but
    1948 EUB             :      * don't decrease it to the point that we have no room for tuples. (That
    1949 ECB             :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1950 EUB             :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1951 ECB             :      * half of allowedMem.  In the pass-by-value case it's not important to
    1952                 :      * account for tuple space, so we don't care if LACKMEM becomes
    1953                 :      * inaccurate.)
    1954                 :      */
    1955 CBC         334 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1956                 : 
    1957             334 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1958 GIC         301 :         USEMEM(state, tapeSpace);
    1959 ECB             : 
    1960                 :     /*
    1961                 :      * Make sure that the temp file(s) underlying the tape set are created in
    1962                 :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1963                 :      * called already, but it doesn't matter if it is called a second time.
    1964                 :      */
    1965 GIC         334 :     PrepareTempTablespaces();
    1966             334 : }
    1967                 : 
    1968                 : /*
    1969 ECB             :  * selectnewtape -- select next tape to output to.
    1970                 :  *
    1971                 :  * This is called after finishing a run when we know another run
    1972                 :  * must be started.  This is used both when building the initial
    1973                 :  * runs, and during merge passes.
    1974                 :  */
    1975                 : static void
    1976 GIC        1825 : selectnewtape(Tuplesortstate *state)
    1977 ECB             : {
    1978                 :     /*
    1979                 :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1980 EUB             :      * both zero.  On each call, we create a new output tape to hold the next
    1981                 :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1982                 :      * existing tapes in a round robin fashion.
    1983 ECB             :      */
    1984 GIC        1825 :     if (state->nOutputTapes < state->maxTapes)
    1985 ECB             :     {
    1986                 :         /* Create a new tape to hold the next run */
    1987 CBC         673 :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1988 GIC         673 :         Assert(state->nOutputRuns == state->nOutputTapes);
    1989             673 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1990             673 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1991             673 :         state->nOutputTapes++;
    1992             673 :         state->nOutputRuns++;
    1993                 :     }
    1994                 :     else
    1995                 :     {
    1996                 :         /*
    1997                 :          * We have reached the max number of tapes.  Append to an existing
    1998                 :          * tape.
    1999                 :          */
    2000            1152 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    2001            1152 :         state->nOutputRuns++;
    2002                 :     }
    2003 CBC        1825 : }
    2004                 : 
    2005                 : /*
    2006                 :  * Initialize the slab allocation arena, for the given number of slots.
    2007 ECB             :  */
    2008                 : static void
    2009 GIC         129 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    2010 ECB             : {
    2011 CBC         129 :     if (numSlots > 0)
    2012                 :     {
    2013 ECB             :         char       *p;
    2014                 :         int         i;
    2015                 : 
    2016 GIC         123 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    2017             123 :         state->slabMemoryEnd = state->slabMemoryBegin +
    2018             123 :             numSlots * SLAB_SLOT_SIZE;
    2019             123 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    2020             123 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    2021                 : 
    2022             123 :         p = state->slabMemoryBegin;
    2023             510 :         for (i = 0; i < numSlots - 1; i++)
    2024 ECB             :         {
    2025 GIC         387 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    2026             387 :             p += SLAB_SLOT_SIZE;
    2027                 :         }
    2028 CBC         123 :         ((SlabSlot *) p)->nextfree = NULL;
    2029                 :     }
    2030 ECB             :     else
    2031                 :     {
    2032 CBC           6 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    2033               6 :         state->slabFreeHead = NULL;
    2034 ECB             :     }
    2035 CBC         129 :     state->slabAllocatorUsed = true;
    2036 GIC         129 : }
    2037 ECB             : 
    2038                 : /*
    2039                 :  * mergeruns -- merge all the completed initial runs.
    2040                 :  *
    2041                 :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    2042                 :  * already been written to initial runs on tape (see dumptuples).
    2043                 :  */
    2044                 : static void
    2045 GIC         129 : mergeruns(Tuplesortstate *state)
    2046                 : {
    2047 ECB             :     int         tapenum;
    2048                 : 
    2049 GIC         129 :     Assert(state->status == TSS_BUILDRUNS);
    2050 CBC         129 :     Assert(state->memtupcount == 0);
    2051 ECB             : 
    2052 GNC         129 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    2053                 :     {
    2054                 :         /*
    2055                 :          * If there are multiple runs to be merged, when we go to read back
    2056                 :          * tuples from disk, abbreviated keys will not have been stored, and
    2057                 :          * we don't care to regenerate them.  Disable abbreviation from this
    2058                 :          * point on.
    2059                 :          */
    2060              15 :         state->base.sortKeys->abbrev_converter = NULL;
    2061              15 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    2062                 : 
    2063                 :         /* Not strictly necessary, but be tidy */
    2064              15 :         state->base.sortKeys->abbrev_abort = NULL;
    2065              15 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    2066                 :     }
    2067 ECB             : 
    2068                 :     /*
    2069                 :      * Reset tuple memory.  We've freed all the tuples that we previously
    2070                 :      * allocated.  We will use the slab allocator from now on.
    2071                 :      */
    2072 GNC         129 :     MemoryContextResetOnly(state->base.tuplecontext);
    2073                 : 
    2074 ECB             :     /*
    2075                 :      * We no longer need a large memtuples array.  (We will allocate a smaller
    2076                 :      * one for the heap later.)
    2077                 :      */
    2078 CBC         129 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2079 GIC         129 :     pfree(state->memtuples);
    2080             129 :     state->memtuples = NULL;
    2081                 : 
    2082                 :     /*
    2083                 :      * Initialize the slab allocator.  We need one slab slot per input tape,
    2084                 :      * for the tuples in the heap, plus one to hold the tuple last returned
    2085                 :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    2086                 :      * however, we don't need to do allocate anything.)
    2087                 :      *
    2088                 :      * In a multi-pass merge, we could shrink this allocation for the last
    2089                 :      * merge pass, if it has fewer tapes than previous passes, but we don't
    2090                 :      * bother.
    2091                 :      *
    2092                 :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    2093                 :      * to track memory usage of individual tuples.
    2094                 :      */
    2095 GNC         129 :     if (state->base.tuples)
    2096 GIC         123 :         init_slab_allocator(state, state->nOutputTapes + 1);
    2097 ECB             :     else
    2098 GIC           6 :         init_slab_allocator(state, 0);
    2099                 : 
    2100 ECB             :     /*
    2101                 :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    2102                 :      * from each input tape.
    2103                 :      *
    2104                 :      * We could shrink this, too, between passes in a multi-pass merge, but we
    2105                 :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    2106                 :      * number of input tapes will not increase between passes.)
    2107                 :      */
    2108 CBC         129 :     state->memtupsize = state->nOutputTapes;
    2109 GNC         258 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    2110 CBC         129 :                                                         state->nOutputTapes * sizeof(SortTuple));
    2111             129 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2112                 : 
    2113                 :     /*
    2114                 :      * Use all the remaining memory we have available for tape buffers among
    2115                 :      * all the input tapes.  At the beginning of each merge pass, we will
    2116                 :      * divide this memory between the input and output tapes in the pass.
    2117 ECB             :      */
    2118 GIC         129 :     state->tape_buffer_mem = state->availMem;
    2119             129 :     USEMEM(state, state->tape_buffer_mem);
    2120 ECB             : #ifdef TRACE_SORT
    2121 CBC         129 :     if (trace_sort)
    2122 LBC           0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    2123 ECB             :              state->worker, state->tape_buffer_mem / 1024);
    2124                 : #endif
    2125                 : 
    2126                 :     for (;;)
    2127                 :     {
    2128                 :         /*
    2129                 :          * On the first iteration, or if we have read all the runs from the
    2130                 :          * input tapes in a multi-pass merge, it's time to start a new pass.
    2131                 :          * Rewind all the output tapes, and make them inputs for the next
    2132                 :          * pass.
    2133                 :          */
    2134 GIC         390 :         if (state->nInputRuns == 0)
    2135 ECB             :         {
    2136                 :             int64       input_buffer_size;
    2137                 : 
    2138                 :             /* Close the old, emptied, input tapes */
    2139 CBC         177 :             if (state->nInputTapes > 0)
    2140 ECB             :             {
    2141 CBC         336 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2142 GIC         288 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2143              48 :                 pfree(state->inputTapes);
    2144                 :             }
    2145                 : 
    2146                 :             /* Previous pass's outputs become next pass's inputs. */
    2147             177 :             state->inputTapes = state->outputTapes;
    2148             177 :             state->nInputTapes = state->nOutputTapes;
    2149             177 :             state->nInputRuns = state->nOutputRuns;
    2150                 : 
    2151                 :             /*
    2152                 :              * Reset output tape variables.  The actual LogicalTapes will be
    2153                 :              * created as needed, here we only allocate the array to hold
    2154                 :              * them.
    2155 ECB             :              */
    2156 GIC         177 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2157 CBC         177 :             state->nOutputTapes = 0;
    2158             177 :             state->nOutputRuns = 0;
    2159                 : 
    2160                 :             /*
    2161                 :              * Redistribute the memory allocated for tape buffers, among the
    2162 ECB             :              * new input and output tapes.
    2163                 :              */
    2164 GIC         177 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2165 ECB             :                                                        state->nInputTapes,
    2166                 :                                                        state->nInputRuns,
    2167                 :                                                        state->maxTapes);
    2168                 : 
    2169                 : #ifdef TRACE_SORT
    2170 GBC         177 :             if (trace_sort)
    2171 UIC           0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2172                 :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2173                 :                      pg_rusage_show(&state->ru_start));
    2174                 : #endif
    2175                 : 
    2176                 :             /* Prepare the new input tapes for merge pass. */
    2177 CBC         777 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2178             600 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2179                 : 
    2180                 :             /*
    2181                 :              * If there's just one run left on each input tape, then only one
    2182                 :              * merge pass remains.  If we don't have to produce a materialized
    2183                 :              * sorted tape, we can stop at this point and do the final merge
    2184 ECB             :              * on-the-fly.
    2185                 :              */
    2186 GNC         177 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2187 GIC         162 :                 && state->nInputRuns <= state->nInputTapes
    2188             120 :                 && !WORKER(state))
    2189                 :             {
    2190                 :                 /* Tell logtape.c we won't be writing anymore */
    2191             119 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2192                 :                 /* Initialize for the final merge pass */
    2193             119 :                 beginmerge(state);
    2194 CBC         119 :                 state->status = TSS_FINALMERGE;
    2195             119 :                 return;
    2196 ECB             :             }
    2197                 :         }
    2198                 : 
    2199                 :         /* Select an output tape */
    2200 CBC         271 :         selectnewtape(state);
    2201                 : 
    2202 ECB             :         /* Merge one run from each input tape. */
    2203 GIC         271 :         mergeonerun(state);
    2204 ECB             : 
    2205                 :         /*
    2206                 :          * If the input tapes are empty, and we output only one output run,
    2207                 :          * we're done.  The current output tape contains the final result.
    2208                 :          */
    2209 GIC         271 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2210              10 :             break;
    2211                 :     }
    2212                 : 
    2213                 :     /*
    2214 ECB             :      * Done.  The result is on a single run on a single tape.
    2215                 :      */
    2216 CBC          10 :     state->result_tape = state->outputTapes[0];
    2217 GIC          10 :     if (!WORKER(state))
    2218 CBC           9 :         LogicalTapeFreeze(state->result_tape, NULL);
    2219 ECB             :     else
    2220 CBC           1 :         worker_freeze_result_tape(state);
    2221 GIC          10 :     state->status = TSS_SORTEDONTAPE;
    2222 ECB             : 
    2223                 :     /* Close all the now-empty input tapes, to release their read buffers. */
    2224 GIC          45 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2225 CBC          35 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2226                 : }
    2227 ECB             : 
    2228 EUB             : /*
    2229 ECB             :  * Merge one run from each input tape.
    2230 EUB             :  */
    2231                 : static void
    2232 CBC         271 : mergeonerun(Tuplesortstate *state)
    2233                 : {
    2234                 :     int         srcTapeIndex;
    2235                 :     LogicalTape *srcTape;
    2236                 : 
    2237 ECB             :     /*
    2238                 :      * Start the merge by loading one tuple from each active source tape into
    2239                 :      * the heap.
    2240                 :      */
    2241 GIC         271 :     beginmerge(state);
    2242 ECB             : 
    2243 GNC         271 :     Assert(state->slabAllocatorUsed);
    2244                 : 
    2245 ECB             :     /*
    2246                 :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2247                 :      * out, and replacing it with next tuple from same tape (if there is
    2248                 :      * another one).
    2249                 :      */
    2250 GIC     1582631 :     while (state->memtupcount > 0)
    2251                 :     {
    2252                 :         SortTuple   stup;
    2253                 : 
    2254 ECB             :         /* write the tuple to destTape */
    2255 GIC     1582360 :         srcTapeIndex = state->memtuples[0].srctape;
    2256 CBC     1582360 :         srcTape = state->inputTapes[srcTapeIndex];
    2257         1582360 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2258                 : 
    2259 ECB             :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2260 CBC     1582360 :         if (state->memtuples[0].tuple)
    2261         1522270 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2262 ECB             : 
    2263                 :         /*
    2264                 :          * pull next tuple from the tape, and replace the written-out tuple in
    2265                 :          * the heap with it.
    2266                 :          */
    2267 GIC     1582360 :         if (mergereadnext(state, srcTape, &stup))
    2268                 :         {
    2269         1580885 :             stup.srctape = srcTapeIndex;
    2270         1580885 :             tuplesort_heap_replace_top(state, &stup);
    2271                 :         }
    2272                 :         else
    2273                 :         {
    2274            1475 :             tuplesort_heap_delete_top(state);
    2275            1475 :             state->nInputRuns--;
    2276                 :         }
    2277                 :     }
    2278                 : 
    2279                 :     /*
    2280                 :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2281                 :      * output tape.
    2282                 :      */
    2283             271 :     markrunend(state->destTape);
    2284             271 : }
    2285                 : 
    2286                 : /*
    2287                 :  * beginmerge - initialize for a merge pass
    2288                 :  *
    2289                 :  * Fill the merge heap with the first tuple from each input tape.
    2290                 :  */
    2291                 : static void
    2292             390 : beginmerge(Tuplesortstate *state)
    2293                 : {
    2294                 :     int         activeTapes;
    2295                 :     int         srcTapeIndex;
    2296                 : 
    2297                 :     /* Heap should be empty here */
    2298             390 :     Assert(state->memtupcount == 0);
    2299                 : 
    2300             390 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2301                 : 
    2302            2142 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2303                 :     {
    2304                 :         SortTuple   tup;
    2305                 : 
    2306            1752 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2307                 :         {
    2308            1641 :             tup.srctape = srcTapeIndex;
    2309            1641 :             tuplesort_heap_insert(state, &tup);
    2310                 :         }
    2311                 :     }
    2312             390 : }
    2313                 : 
    2314                 : /*
    2315                 :  * mergereadnext - read next tuple from one merge input tape
    2316                 :  *
    2317                 :  * Returns false on EOF.
    2318                 :  */
    2319                 : static bool
    2320         3861762 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2321                 : {
    2322                 :     unsigned int tuplen;
    2323                 : 
    2324                 :     /* read next tuple, if any */
    2325         3861762 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2326            1737 :         return false;
    2327         3860025 :     READTUP(state, stup, srcTape, tuplen);
    2328                 : 
    2329         3860025 :     return true;
    2330                 : }
    2331                 : 
    2332                 : /*
    2333                 :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2334                 :  *
    2335                 :  * When alltuples = true, dump everything currently in memory.  (This case is
    2336                 :  * only used at end of input data.)
    2337                 :  */
    2338                 : static void
    2339          586635 : dumptuples(Tuplesortstate *state, bool alltuples)
    2340                 : {
    2341                 :     int         memtupwrite;
    2342                 :     int         i;
    2343                 : 
    2344                 :     /*
    2345                 :      * Nothing to do if we still fit in available memory and have array slots,
    2346                 :      * unless this is the final call during initial run generation.
    2347                 :      */
    2348          586635 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2349          585344 :         !alltuples)
    2350          585081 :         return;
    2351                 : 
    2352                 :     /*
    2353                 :      * Final call might require no sorting, in rare cases where we just so
    2354                 :      * happen to have previously LACKMEM()'d at the point where exactly all
    2355                 :      * remaining tuples are loaded into memory, just before input was
    2356                 :      * exhausted.  In general, short final runs are quite possible, but avoid
    2357                 :      * creating a completely empty run.  In a worker, though, we must produce
    2358                 :      * at least one tape, even if it's empty.
    2359                 :      */
    2360            1554 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2361 UIC           0 :         return;
    2362                 : 
    2363 GIC        1554 :     Assert(state->status == TSS_BUILDRUNS);
    2364                 : 
    2365                 :     /*
    2366                 :      * It seems unlikely that this limit will ever be exceeded, but take no
    2367                 :      * chances
    2368                 :      */
    2369            1554 :     if (state->currentRun == INT_MAX)
    2370 UIC           0 :         ereport(ERROR,
    2371                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2372                 :                  errmsg("cannot have more than %d runs for an external sort",
    2373                 :                         INT_MAX)));
    2374                 : 
    2375 GIC        1554 :     if (state->currentRun > 0)
    2376            1291 :         selectnewtape(state);
    2377                 : 
    2378            1554 :     state->currentRun++;
    2379                 : 
    2380                 : #ifdef TRACE_SORT
    2381            1554 :     if (trace_sort)
    2382 UIC           0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2383                 :              state->worker, state->currentRun,
    2384                 :              pg_rusage_show(&state->ru_start));
    2385                 : #endif
    2386                 : 
    2387                 :     /*
    2388                 :      * Sort all tuples accumulated within the allowed amount of memory for
    2389                 :      * this run using quicksort
    2390                 :      */
    2391 GIC        1554 :     tuplesort_sort_memtuples(state);
    2392                 : 
    2393                 : #ifdef TRACE_SORT
    2394            1554 :     if (trace_sort)
    2395 UIC           0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2396                 :              state->worker, state->currentRun,
    2397                 :              pg_rusage_show(&state->ru_start));
    2398                 : #endif
    2399                 : 
    2400 GIC        1554 :     memtupwrite = state->memtupcount;
    2401         2526830 :     for (i = 0; i < memtupwrite; i++)
    2402                 :     {
    2403 GNC     2525276 :         SortTuple  *stup = &state->memtuples[i];
    2404                 : 
    2405         2525276 :         WRITETUP(state, state->destTape, stup);
    2406                 : 
    2407                 :         /*
    2408                 :          * Account for freeing the tuple, but no need to do the actual pfree
    2409                 :          * since the tuplecontext is being reset after the loop.
    2410                 :          */
    2411         2525276 :         if (stup->tuple != NULL)
    2412         2435219 :             FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    2413                 :     }
    2414                 : 
    2415            1554 :     state->memtupcount = 0;
    2416                 : 
    2417                 :     /*
    2418                 :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2419                 :      * allocated.  It's important to avoid fragmentation when there is a stark
    2420                 :      * change in the sizes of incoming tuples.  Fragmentation due to
    2421                 :      * AllocSetFree's bucketing by size class might be particularly bad if
    2422                 :      * this step wasn't taken.
    2423                 :      */
    2424            1554 :     MemoryContextReset(state->base.tuplecontext);
    2425                 : 
    2426 GIC        1554 :     markrunend(state->destTape);
    2427                 : 
    2428                 : #ifdef TRACE_SORT
    2429            1554 :     if (trace_sort)
    2430 UIC           0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2431                 :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2432                 :              pg_rusage_show(&state->ru_start));
    2433                 : #endif
    2434                 : }
    2435                 : 
    2436                 : /*
    2437                 :  * tuplesort_rescan     - rewind and replay the scan
    2438                 :  */
    2439                 : void
    2440 GIC          29 : tuplesort_rescan(Tuplesortstate *state)
    2441                 : {
    2442 GNC          29 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2443                 : 
    2444              29 :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2445                 : 
    2446 GIC          29 :     switch (state->status)
    2447                 :     {
    2448              26 :         case TSS_SORTEDINMEM:
    2449              26 :             state->current = 0;
    2450              26 :             state->eof_reached = false;
    2451              26 :             state->markpos_offset = 0;
    2452              26 :             state->markpos_eof = false;
    2453              26 :             break;
    2454               3 :         case TSS_SORTEDONTAPE:
    2455               3 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2456               3 :             state->eof_reached = false;
    2457               3 :             state->markpos_block = 0L;
    2458               3 :             state->markpos_offset = 0;
    2459               3 :             state->markpos_eof = false;
    2460               3 :             break;
    2461 UIC           0 :         default:
    2462               0 :             elog(ERROR, "invalid tuplesort state");
    2463                 :             break;
    2464                 :     }
    2465                 : 
    2466 GIC          29 :     MemoryContextSwitchTo(oldcontext);
    2467              29 : }
    2468                 : 
    2469                 : /*
    2470                 :  * tuplesort_markpos    - saves current position in the merged sort file
    2471                 :  */
    2472                 : void
    2473          284485 : tuplesort_markpos(Tuplesortstate *state)
    2474                 : {
    2475 GNC      284485 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2476                 : 
    2477          284485 :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2478                 : 
    2479 GIC      284485 :     switch (state->status)
    2480                 :     {
    2481          280081 :         case TSS_SORTEDINMEM:
    2482          280081 :             state->markpos_offset = state->current;
    2483          280081 :             state->markpos_eof = state->eof_reached;
    2484          280081 :             break;
    2485            4404 :         case TSS_SORTEDONTAPE:
    2486            4404 :             LogicalTapeTell(state->result_tape,
    2487                 :                             &state->markpos_block,
    2488                 :                             &state->markpos_offset);
    2489            4404 :             state->markpos_eof = state->eof_reached;
    2490            4404 :             break;
    2491 UIC           0 :         default:
    2492               0 :             elog(ERROR, "invalid tuplesort state");
    2493                 :             break;
    2494                 :     }
    2495                 : 
    2496 GIC      284485 :     MemoryContextSwitchTo(oldcontext);
    2497          284485 : }
    2498                 : 
    2499                 : /*
    2500                 :  * tuplesort_restorepos - restores current position in merged sort file to
    2501                 :  *                        last saved position
    2502                 :  */
    2503                 : void
    2504           14909 : tuplesort_restorepos(Tuplesortstate *state)
    2505                 : {
    2506 GNC       14909 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2507                 : 
    2508           14909 :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2509                 : 
    2510 GIC       14909 :     switch (state->status)
    2511                 :     {
    2512           11813 :         case TSS_SORTEDINMEM:
    2513           11813 :             state->current = state->markpos_offset;
    2514           11813 :             state->eof_reached = state->markpos_eof;
    2515           11813 :             break;
    2516            3096 :         case TSS_SORTEDONTAPE:
    2517            3096 :             LogicalTapeSeek(state->result_tape,
    2518                 :                             state->markpos_block,
    2519                 :                             state->markpos_offset);
    2520            3096 :             state->eof_reached = state->markpos_eof;
    2521            3096 :             break;
    2522 UIC           0 :         default:
    2523               0 :             elog(ERROR, "invalid tuplesort state");
    2524                 :             break;
    2525                 :     }
    2526                 : 
    2527 GIC       14909 :     MemoryContextSwitchTo(oldcontext);
    2528           14909 : }
    2529                 : 
    2530                 : /*
    2531                 :  * tuplesort_get_stats - extract summary statistics
    2532                 :  *
    2533                 :  * This can be called after tuplesort_performsort() finishes to obtain
    2534                 :  * printable summary information about how the sort was performed.
    2535                 :  */
    2536                 : void
    2537             180 : tuplesort_get_stats(Tuplesortstate *state,
    2538                 :                     TuplesortInstrumentation *stats)
    2539                 : {
    2540                 :     /*
    2541                 :      * Note: it might seem we should provide both memory and disk usage for a
    2542                 :      * disk-based sort.  However, the current code doesn't track memory space
    2543                 :      * accurately once we have begun to return tuples to the caller (since we
    2544                 :      * don't account for pfree's the caller is expected to do), so we cannot
    2545                 :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2546                 :      * to fix.  Is it worth creating an API for the memory context code to
    2547                 :      * tell us how much is actually used in sortcontext?
    2548                 :      */
    2549             180 :     tuplesort_updatemax(state);
    2550                 : 
    2551             180 :     if (state->isMaxSpaceDisk)
    2552 UIC           0 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2553                 :     else
    2554 GIC         180 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2555             180 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2556                 : 
    2557             180 :     switch (state->maxSpaceStatus)
    2558                 :     {
    2559             180 :         case TSS_SORTEDINMEM:
    2560             180 :             if (state->boundUsed)
    2561              21 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2562                 :             else
    2563             159 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2564             180 :             break;
    2565 UIC           0 :         case TSS_SORTEDONTAPE:
    2566               0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2567               0 :             break;
    2568               0 :         case TSS_FINALMERGE:
    2569               0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2570               0 :             break;
    2571               0 :         default:
    2572               0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2573               0 :             break;
    2574                 :     }
    2575 GIC         180 : }
    2576                 : 
    2577                 : /*
    2578                 :  * Convert TuplesortMethod to a string.
    2579                 :  */
    2580                 : const char *
    2581             129 : tuplesort_method_name(TuplesortMethod m)
    2582                 : {
    2583             129 :     switch (m)
    2584                 :     {
    2585 UIC           0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2586               0 :             return "still in progress";
    2587 GIC          21 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2588              21 :             return "top-N heapsort";
    2589             108 :         case SORT_TYPE_QUICKSORT:
    2590             108 :             return "quicksort";
    2591 UIC           0 :         case SORT_TYPE_EXTERNAL_SORT:
    2592               0 :             return "external sort";
    2593               0 :         case SORT_TYPE_EXTERNAL_MERGE:
    2594               0 :             return "external merge";
    2595                 :     }
    2596                 : 
    2597               0 :     return "unknown";
    2598                 : }
    2599                 : 
    2600                 : /*
    2601                 :  * Convert TuplesortSpaceType to a string.
    2602                 :  */
    2603                 : const char *
    2604 GIC         111 : tuplesort_space_type_name(TuplesortSpaceType t)
    2605                 : {
    2606             111 :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2607             111 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2608                 : }
    2609                 : 
    2610                 : 
    2611                 : /*
    2612                 :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2613                 :  */
    2614                 : 
    2615                 : /*
    2616                 :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2617                 :  * discarding all but the smallest "state->bound" tuples.
    2618                 :  *
    2619                 :  * When working with a bounded heap, we want to keep the largest entry
    2620                 :  * at the root (array entry zero), instead of the smallest as in the normal
    2621                 :  * sort case.  This allows us to discard the largest entry cheaply.
    2622                 :  * Therefore, we temporarily reverse the sort direction.
    2623                 :  */
    2624                 : static void
    2625             200 : make_bounded_heap(Tuplesortstate *state)
    2626                 : {
    2627             200 :     int         tupcount = state->memtupcount;
    2628                 :     int         i;
    2629                 : 
    2630             200 :     Assert(state->status == TSS_INITIAL);
    2631             200 :     Assert(state->bounded);
    2632             200 :     Assert(tupcount >= state->bound);
    2633             200 :     Assert(SERIAL(state));
    2634                 : 
    2635                 :     /* Reverse sort direction so largest entry will be at root */
    2636             200 :     reversedirection(state);
    2637                 : 
    2638             200 :     state->memtupcount = 0;      /* make the heap empty */
    2639           18948 :     for (i = 0; i < tupcount; i++)
    2640                 :     {
    2641           18748 :         if (state->memtupcount < state->bound)
    2642                 :         {
    2643                 :             /* Insert next tuple into heap */
    2644                 :             /* Must copy source tuple to avoid possible overwrite */
    2645            9274 :             SortTuple   stup = state->memtuples[i];
    2646                 : 
    2647            9274 :             tuplesort_heap_insert(state, &stup);
    2648                 :         }
    2649                 :         else
    2650                 :         {
    2651                 :             /*
    2652                 :              * The heap is full.  Replace the largest entry with the new
    2653                 :              * tuple, or just discard it, if it's larger than anything already
    2654                 :              * in the heap.
    2655                 :              */
    2656            9474 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2657                 :             {
    2658            4884 :                 free_sort_tuple(state, &state->memtuples[i]);
    2659            4884 :                 CHECK_FOR_INTERRUPTS();
    2660                 :             }
    2661                 :             else
    2662            4590 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2663                 :         }
    2664                 :     }
    2665                 : 
    2666             200 :     Assert(state->memtupcount == state->bound);
    2667             200 :     state->status = TSS_BOUNDED;
    2668             200 : }
    2669                 : 
    2670                 : /*
    2671                 :  * Convert the bounded heap to a properly-sorted array
    2672                 :  */
    2673                 : static void
    2674             200 : sort_bounded_heap(Tuplesortstate *state)
    2675                 : {
    2676             200 :     int         tupcount = state->memtupcount;
    2677                 : 
    2678             200 :     Assert(state->status == TSS_BOUNDED);
    2679             200 :     Assert(state->bounded);
    2680             200 :     Assert(tupcount == state->bound);
    2681             200 :     Assert(SERIAL(state));
    2682                 : 
    2683                 :     /*
    2684                 :      * We can unheapify in place because each delete-top call will remove the
    2685                 :      * largest entry, which we can promptly store in the newly freed slot at
    2686                 :      * the end.  Once we're down to a single-entry heap, we're done.
    2687                 :      */
    2688            9274 :     while (state->memtupcount > 1)
    2689                 :     {
    2690            9074 :         SortTuple   stup = state->memtuples[0];
    2691                 : 
    2692                 :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2693            9074 :         tuplesort_heap_delete_top(state);
    2694            9074 :         state->memtuples[state->memtupcount] = stup;
    2695                 :     }
    2696             200 :     state->memtupcount = tupcount;
    2697                 : 
    2698                 :     /*
    2699                 :      * Reverse sort direction back to the original state.  This is not
    2700                 :      * actually necessary but seems like a good idea for tidiness.
    2701                 :      */
    2702             200 :     reversedirection(state);
    2703                 : 
    2704             200 :     state->status = TSS_SORTEDINMEM;
    2705             200 :     state->boundUsed = true;
    2706             200 : }
    2707                 : 
    2708                 : /*
    2709                 :  * Sort all memtuples using specialized qsort() routines.
    2710                 :  *
    2711                 :  * Quicksort is used for small in-memory sorts, and external sort runs.
    2712                 :  */
    2713                 : static void
    2714          135335 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2715                 : {
    2716          135335 :     Assert(!LEADER(state));
    2717                 : 
    2718          135335 :     if (state->memtupcount > 1)
    2719                 :     {
    2720                 :         /*
    2721                 :          * Do we have the leading column's value or abbreviation in datum1,
    2722                 :          * and is there a specialization for its comparator?
    2723                 :          */
    2724 GNC       41869 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2725                 :         {
    2726           41856 :             if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
    2727                 :             {
    2728 GIC        2959 :                 qsort_tuple_unsigned(state->memtuples,
    2729            2959 :                                      state->memtupcount,
    2730                 :                                      state);
    2731            2947 :                 return;
    2732                 :             }
    2733                 : #if SIZEOF_DATUM >= 8
    2734 GNC       38897 :             else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
    2735                 :             {
    2736 GIC         497 :                 qsort_tuple_signed(state->memtuples,
    2737             497 :                                    state->memtupcount,
    2738                 :                                    state);
    2739             497 :                 return;
    2740                 :             }
    2741                 : #endif
    2742 GNC       38400 :             else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
    2743                 :             {
    2744 GIC       18636 :                 qsort_tuple_int32(state->memtuples,
    2745           18636 :                                   state->memtupcount,
    2746                 :                                   state);
    2747           18606 :                 return;
    2748                 :             }
    2749                 :         }
    2750                 : 
    2751                 :         /* Can we use the single-key sort function? */
    2752 GNC       19777 :         if (state->base.onlyKey != NULL)
    2753                 :         {
    2754 GIC        2972 :             qsort_ssup(state->memtuples, state->memtupcount,
    2755 GNC        2972 :                        state->base.onlyKey);
    2756                 :         }
    2757                 :         else
    2758                 :         {
    2759 GIC       16805 :             qsort_tuple(state->memtuples,
    2760           16805 :                         state->memtupcount,
    2761                 :                         state->base.comparetup,
    2762                 :                         state);
    2763                 :         }
    2764                 :     }
    2765                 : }
    2766                 : 
    2767                 : /*
    2768                 :  * Insert a new tuple into an empty or existing heap, maintaining the
    2769                 :  * heap invariant.  Caller is responsible for ensuring there's room.
    2770                 :  *
    2771                 :  * Note: For some callers, tuple points to a memtuples[] entry above the
    2772                 :  * end of the heap.  This is safe as long as it's not immediately adjacent
    2773                 :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    2774                 :  * is, it might get overwritten before being moved into the heap!
    2775                 :  */
    2776                 : static void
    2777           10915 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    2778                 : {
    2779                 :     SortTuple  *memtuples;
    2780                 :     int         j;
    2781                 : 
    2782           10915 :     memtuples = state->memtuples;
    2783           10915 :     Assert(state->memtupcount < state->memtupsize);
    2784                 : 
    2785           10915 :     CHECK_FOR_INTERRUPTS();
    2786                 : 
    2787                 :     /*
    2788                 :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    2789                 :      * using 1-based array indexes, not 0-based.
    2790                 :      */
    2791           10915 :     j = state->memtupcount++;
    2792           30531 :     while (j > 0)
    2793                 :     {
    2794           26524 :         int         i = (j - 1) >> 1;
    2795                 : 
    2796           26524 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    2797            6908 :             break;
    2798           19616 :         memtuples[j] = memtuples[i];
    2799           19616 :         j = i;
    2800                 :     }
    2801           10915 :     memtuples[j] = *tuple;
    2802           10915 : }
    2803                 : 
    2804                 : /*
    2805                 :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    2806                 :  * memtupcount, and sift up to maintain the heap invariant.
    2807                 :  *
    2808                 :  * The caller has already free'd the tuple the top node points to,
    2809                 :  * if necessary.
    2810                 :  */
    2811                 : static void
    2812           10700 : tuplesort_heap_delete_top(Tuplesortstate *state)
    2813                 : {
    2814           10700 :     SortTuple  *memtuples = state->memtuples;
    2815                 :     SortTuple  *tuple;
    2816                 : 
    2817           10700 :     if (--state->memtupcount <= 0)
    2818             330 :         return;
    2819                 : 
    2820                 :     /*
    2821                 :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    2822                 :      * current top node with it.
    2823                 :      */
    2824           10370 :     tuple = &memtuples[state->memtupcount];
    2825           10370 :     tuplesort_heap_replace_top(state, tuple);
    2826                 : }
    2827                 : 
    2828                 : /*
    2829                 :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    2830                 :  * maintain the heap invariant.
    2831                 :  *
    2832                 :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    2833                 :  * Heapsort, steps H3-H8).
    2834                 :  */
    2835                 : static void
    2836         4124796 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    2837                 : {
    2838         4124796 :     SortTuple  *memtuples = state->memtuples;
    2839                 :     unsigned int i,
    2840                 :                 n;
    2841                 : 
    2842         4124796 :     Assert(state->memtupcount >= 1);
    2843                 : 
    2844         4124796 :     CHECK_FOR_INTERRUPTS();
    2845                 : 
    2846                 :     /*
    2847                 :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    2848                 :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    2849                 :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    2850                 :      */
    2851         4124796 :     n = state->memtupcount;
    2852         4124796 :     i = 0;                      /* i is where the "hole" is */
    2853                 :     for (;;)
    2854         1109384 :     {
    2855         5234180 :         unsigned int j = 2 * i + 1;
    2856                 : 
    2857         5234180 :         if (j >= n)
    2858         1010574 :             break;
    2859         5638577 :         if (j + 1 < n &&
    2860         1414971 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    2861          575668 :             j++;
    2862         4223606 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    2863         3114222 :             break;
    2864         1109384 :         memtuples[i] = memtuples[j];
    2865         1109384 :         i = j;
    2866                 :     }
    2867         4124796 :     memtuples[i] = *tuple;
    2868         4124796 : }
    2869                 : 
    2870                 : /*
    2871                 :  * Function to reverse the sort direction from its current state
    2872                 :  *
    2873                 :  * It is not safe to call this when performing hash tuplesorts
    2874                 :  */
    2875                 : static void
    2876             400 : reversedirection(Tuplesortstate *state)
    2877                 : {
    2878 GNC         400 :     SortSupport sortKey = state->base.sortKeys;
    2879                 :     int         nkey;
    2880                 : 
    2881             980 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    2882                 :     {
    2883 GIC         580 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    2884             580 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    2885                 :     }
    2886             400 : }
    2887                 : 
    2888                 : 
    2889                 : /*
    2890                 :  * Tape interface routines
    2891                 :  */
    2892                 : 
    2893                 : static unsigned int
    2894         3998265 : getlen(LogicalTape *tape, bool eofOK)
    2895                 : {
    2896                 :     unsigned int len;
    2897                 : 
    2898         3998265 :     if (LogicalTapeRead(tape,
    2899                 :                         &len, sizeof(len)) != sizeof(len))
    2900 UIC           0 :         elog(ERROR, "unexpected end of tape");
    2901 GIC     3998265 :     if (len == 0 && !eofOK)
    2902 UIC           0 :         elog(ERROR, "unexpected end of data");
    2903 GIC     3998265 :     return len;
    2904                 : }
    2905                 : 
    2906                 : static void
    2907            1825 : markrunend(LogicalTape *tape)
    2908                 : {
    2909            1825 :     unsigned int len = 0;
    2910                 : 
    2911 GNC        1825 :     LogicalTapeWrite(tape, &len, sizeof(len));
    2912 GIC        1825 : }
    2913                 : 
    2914                 : /*
    2915                 :  * Get memory for tuple from within READTUP() routine.
    2916                 :  *
    2917                 :  * We use next free slot from the slab allocator, or palloc() if the tuple
    2918                 :  * is too large for that.
    2919                 :  */
    2920                 : void *
    2921 GNC     3846345 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    2922                 : {
    2923                 :     SlabSlot   *buf;
    2924                 : 
    2925                 :     /*
    2926                 :      * We pre-allocate enough slots in the slab arena that we should never run
    2927                 :      * out.
    2928                 :      */
    2929 GIC     3846345 :     Assert(state->slabFreeHead);
    2930                 : 
    2931         3846345 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    2932 UNC           0 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    2933                 :     else
    2934                 :     {
    2935 GIC     3846345 :         buf = state->slabFreeHead;
    2936                 :         /* Reuse this slot */
    2937         3846345 :         state->slabFreeHead = buf->nextfree;
    2938                 : 
    2939         3846345 :         return buf;
    2940                 :     }
    2941                 : }
    2942                 : 
    2943                 : 
    2944                 : /*
    2945                 :  * Parallel sort routines
    2946                 :  */
    2947                 : 
    2948                 : /*
    2949                 :  * tuplesort_estimate_shared - estimate required shared memory allocation
    2950                 :  *
    2951                 :  * nWorkers is an estimate of the number of workers (it's the number that
    2952                 :  * will be requested).
    2953                 :  */
    2954                 : Size
    2955              71 : tuplesort_estimate_shared(int nWorkers)
    2956                 : {
    2957                 :     Size        tapesSize;
    2958                 : 
    2959              71 :     Assert(nWorkers > 0);
    2960                 : 
    2961                 :     /* Make sure that BufFile shared state is MAXALIGN'd */
    2962              71 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    2963              71 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    2964                 : 
    2965              71 :     return tapesSize;
    2966                 : }
    2967                 : 
    2968                 : /*
    2969                 :  * tuplesort_initialize_shared - initialize shared tuplesort state
    2970                 :  *
    2971                 :  * Must be called from leader process before workers are launched, to
    2972                 :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    2973                 :  * should match the argument passed to tuplesort_estimate_shared().
    2974                 :  */
    2975                 : void
    2976             103 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    2977                 : {
    2978                 :     int         i;
    2979                 : 
    2980             103 :     Assert(nWorkers > 0);
    2981                 : 
    2982             103 :     SpinLockInit(&shared->mutex);
    2983             103 :     shared->currentWorker = 0;
    2984             103 :     shared->workersFinished = 0;
    2985             103 :     SharedFileSetInit(&shared->fileset, seg);
    2986             103 :     shared->nTapes = nWorkers;
    2987             309 :     for (i = 0; i < nWorkers; i++)
    2988                 :     {
    2989             206 :         shared->tapes[i].firstblocknumber = 0L;
    2990                 :     }
    2991             103 : }
    2992                 : 
    2993                 : /*
    2994                 :  * tuplesort_attach_shared - attach to shared tuplesort state
    2995                 :  *
    2996                 :  * Must be called by all worker processes.
    2997                 :  */
    2998                 : void
    2999             103 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    3000                 : {
    3001                 :     /* Attach to SharedFileSet */
    3002             103 :     SharedFileSetAttach(&shared->fileset, seg);
    3003             103 : }
    3004                 : 
    3005                 : /*
    3006                 :  * worker_get_identifier - Assign and return ordinal identifier for worker
    3007                 :  *
    3008                 :  * The order in which these are assigned is not well defined, and should not
    3009                 :  * matter; worker numbers across parallel sort participants need only be
    3010                 :  * distinct and gapless.  logtape.c requires this.
    3011                 :  *
    3012                 :  * Note that the identifiers assigned from here have no relation to
    3013                 :  * ParallelWorkerNumber number, to avoid making any assumption about
    3014                 :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    3015                 :  * convention of representing a non-worker with worker number -1.  This
    3016                 :  * includes the leader, as well as serial Tuplesort processes.
    3017                 :  */
    3018                 : static int
    3019             206 : worker_get_identifier(Tuplesortstate *state)
    3020                 : {
    3021             206 :     Sharedsort *shared = state->shared;
    3022                 :     int         worker;
    3023                 : 
    3024             206 :     Assert(WORKER(state));
    3025                 : 
    3026             206 :     SpinLockAcquire(&shared->mutex);
    3027             206 :     worker = shared->currentWorker++;
    3028             206 :     SpinLockRelease(&shared->mutex);
    3029                 : 
    3030             206 :     return worker;
    3031                 : }
    3032                 : 
    3033                 : /*
    3034                 :  * worker_freeze_result_tape - freeze worker's result tape for leader
    3035                 :  *
    3036                 :  * This is called by workers just after the result tape has been determined,
    3037                 :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    3038                 :  * workers require a few additional steps over similar serial
    3039                 :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    3040                 :  * steps are around freeing now unneeded resources, and representing to
    3041                 :  * leader that worker's input run is available for its merge.
    3042                 :  *
    3043                 :  * There should only be one final output run for each worker, which consists
    3044                 :  * of all tuples that were originally input into worker.
    3045                 :  */
    3046                 : static void
    3047             206 : worker_freeze_result_tape(Tuplesortstate *state)
    3048                 : {
    3049             206 :     Sharedsort *shared = state->shared;
    3050                 :     TapeShare   output;
    3051                 : 
    3052             206 :     Assert(WORKER(state));
    3053             206 :     Assert(state->result_tape != NULL);
    3054             206 :     Assert(state->memtupcount == 0);
    3055                 : 
    3056                 :     /*
    3057                 :      * Free most remaining memory, in case caller is sensitive to our holding
    3058                 :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3059                 :      */
    3060             206 :     pfree(state->memtuples);
    3061                 :     /* Be tidy */
    3062             206 :     state->memtuples = NULL;
    3063             206 :     state->memtupsize = 0;
    3064                 : 
    3065                 :     /*
    3066                 :      * Parallel worker requires result tape metadata, which is to be stored in
    3067                 :      * shared memory for leader
    3068                 :      */
    3069             206 :     LogicalTapeFreeze(state->result_tape, &output);
    3070                 : 
    3071                 :     /* Store properties of output tape, and update finished worker count */
    3072             206 :     SpinLockAcquire(&shared->mutex);
    3073             206 :     shared->tapes[state->worker] = output;
    3074             206 :     shared->workersFinished++;
    3075             206 :     SpinLockRelease(&shared->mutex);
    3076             206 : }
    3077                 : 
    3078                 : /*
    3079                 :  * worker_nomergeruns - dump memtuples in worker, without merging
    3080                 :  *
    3081                 :  * This called as an alternative to mergeruns() with a worker when no
    3082                 :  * merging is required.
    3083                 :  */
    3084                 : static void
    3085             205 : worker_nomergeruns(Tuplesortstate *state)
    3086                 : {
    3087             205 :     Assert(WORKER(state));
    3088             205 :     Assert(state->result_tape == NULL);
    3089             205 :     Assert(state->nOutputRuns == 1);
    3090                 : 
    3091             205 :     state->result_tape = state->destTape;
    3092             205 :     worker_freeze_result_tape(state);
    3093             205 : }
    3094                 : 
    3095                 : /*
    3096                 :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3097                 :  *
    3098                 :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3099                 :  * sorting has occurred in workers, all of which must have already returned
    3100                 :  * from tuplesort_performsort().
    3101                 :  *
    3102                 :  * When this returns, leader process is left in a state that is virtually
    3103                 :  * indistinguishable from it having generated runs as a serial external sort
    3104                 :  * might have.
    3105                 :  */
    3106                 : static void
    3107              71 : leader_takeover_tapes(Tuplesortstate *state)
    3108                 : {
    3109              71 :     Sharedsort *shared = state->shared;
    3110              71 :     int         nParticipants = state->nParticipants;
    3111                 :     int         workersFinished;
    3112                 :     int         j;
    3113                 : 
    3114              71 :     Assert(LEADER(state));
    3115              71 :     Assert(nParticipants >= 1);
    3116                 : 
    3117              71 :     SpinLockAcquire(&shared->mutex);
    3118              71 :     workersFinished = shared->workersFinished;
    3119              71 :     SpinLockRelease(&shared->mutex);
    3120                 : 
    3121              71 :     if (nParticipants != workersFinished)
    3122 UIC           0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3123                 : 
    3124                 :     /*
    3125                 :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3126                 :      * the end.  Parallel workers are far more expensive than logical tapes,
    3127                 :      * so the number of tapes allocated here should never be excessive.
    3128                 :      */
    3129 GIC          71 :     inittapestate(state, nParticipants);
    3130              71 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3131                 : 
    3132                 :     /*
    3133                 :      * Set currentRun to reflect the number of runs we will merge (it's not
    3134                 :      * used for anything, this is just pro forma)
    3135                 :      */
    3136              71 :     state->currentRun = nParticipants;
    3137                 : 
    3138                 :     /*
    3139                 :      * Initialize the state to look the same as after building the initial
    3140                 :      * runs.
    3141                 :      *
    3142                 :      * There will always be exactly 1 run per worker, and exactly one input
    3143                 :      * tape per run, because workers always output exactly 1 run, even when
    3144                 :      * there were no input tuples for workers to sort.
    3145                 :      */
    3146              71 :     state->inputTapes = NULL;
    3147              71 :     state->nInputTapes = 0;
    3148              71 :     state->nInputRuns = 0;
    3149                 : 
    3150              71 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3151              71 :     state->nOutputTapes = nParticipants;
    3152              71 :     state->nOutputRuns = nParticipants;
    3153                 : 
    3154             213 :     for (j = 0; j < nParticipants; j++)
    3155                 :     {
    3156             142 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3157                 :     }
    3158                 : 
    3159              71 :     state->status = TSS_BUILDRUNS;
    3160              71 : }
    3161                 : 
    3162                 : /*
    3163                 :  * Convenience routine to free a tuple previously loaded into sort memory
    3164                 :  */
    3165                 : static void
    3166         1872895 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3167                 : {
    3168         1872895 :     if (stup->tuple)
    3169                 :     {
    3170         1791797 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3171         1791797 :         pfree(stup->tuple);
    3172         1791797 :         stup->tuple = NULL;
    3173                 :     }
    3174         1872895 : }
    3175                 : 
    3176                 : int
    3177         2074284 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3178                 : {
    3179         2074284 :     if (x < y)
    3180 UIC           0 :         return -1;
    3181 GIC     2074284 :     else if (x > y)
    3182 UIC           0 :         return 1;
    3183                 :     else
    3184 GIC     2074284 :         return 0;
    3185                 : }
    3186                 : 
    3187                 : #if SIZEOF_DATUM >= 8
    3188                 : int
    3189          562926 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3190                 : {
    3191          562926 :     int64       xx = DatumGetInt64(x);
    3192          562926 :     int64       yy = DatumGetInt64(y);
    3193                 : 
    3194          562926 :     if (xx < yy)
    3195          208602 :         return -1;
    3196          354324 :     else if (xx > yy)
    3197          173964 :         return 1;
    3198                 :     else
    3199          180360 :         return 0;
    3200                 : }
    3201                 : #endif
    3202                 : 
    3203                 : int
    3204       118722719 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3205                 : {
    3206       118722719 :     int32       xx = DatumGetInt32(x);
    3207       118722719 :     int32       yy = DatumGetInt32(y);
    3208                 : 
    3209       118722719 :     if (xx < yy)
    3210        25155852 :         return -1;
    3211        93566867 :     else if (xx > yy)
    3212        23604938 :         return 1;
    3213                 :     else
    3214        69961929 :         return 0;
    3215                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a