LCOV - differential code coverage report
Current view: top level - src/backend/utils/sort - tuplestore.c (source / functions) Coverage Total Hit UNC UBC GIC GNC CBC ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 78.6 % 457 359 1 97 11 9 339 13 1 7
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 27 27 3 2 22 3
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * tuplestore.c
       4                 :  *    Generalized routines for temporary tuple storage.
       5                 :  *
       6                 :  * This module handles temporary storage of tuples for purposes such
       7                 :  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
       8                 :  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
       9                 :  * but can only store and regurgitate a sequence of tuples.  However,
      10                 :  * because no sort is required, it is allowed to start reading the sequence
      11                 :  * before it has all been written.  This is particularly useful for cursors,
      12                 :  * because it allows random access within the already-scanned portion of
      13                 :  * a query without having to process the underlying scan to completion.
      14                 :  * Also, it is possible to support multiple independent read pointers.
      15                 :  *
      16                 :  * A temporary file is used to handle the data if it exceeds the
      17                 :  * space limit specified by the caller.
      18                 :  *
      19                 :  * The (approximate) amount of memory allowed to the tuplestore is specified
      20                 :  * in kilobytes by the caller.  We absorb tuples and simply store them in an
      21                 :  * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
      22                 :  * maxKBytes, we dump all the tuples into a temp file and then read from that
      23                 :  * when needed.
      24                 :  *
      25                 :  * Upon creation, a tuplestore supports a single read pointer, numbered 0.
      26                 :  * Additional read pointers can be created using tuplestore_alloc_read_pointer.
      27                 :  * Mark/restore behavior is supported by copying read pointers.
      28                 :  *
      29                 :  * When the caller requests backward-scan capability, we write the temp file
      30                 :  * in a format that allows either forward or backward scan.  Otherwise, only
      31                 :  * forward scan is allowed.  A request for backward scan must be made before
      32                 :  * putting any tuples into the tuplestore.  Rewind is normally allowed but
      33                 :  * can be turned off via tuplestore_set_eflags; turning off rewind for all
      34                 :  * read pointers enables truncation of the tuplestore at the oldest read point
      35                 :  * for minimal memory usage.  (The caller must explicitly call tuplestore_trim
      36                 :  * at appropriate times for truncation to actually happen.)
      37                 :  *
      38                 :  * Note: in TSS_WRITEFILE state, the temp file's seek position is the
      39                 :  * current write position, and the write-position variables in the tuplestore
      40                 :  * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
      41                 :  * seek position is the active read pointer's position, and that read pointer
      42                 :  * isn't kept up to date.  We update the appropriate variables using ftell()
      43                 :  * before switching to the other state or activating a different read pointer.
      44                 :  *
      45                 :  *
      46                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      47                 :  * Portions Copyright (c) 1994, Regents of the University of California
      48                 :  *
      49                 :  * IDENTIFICATION
      50                 :  *    src/backend/utils/sort/tuplestore.c
      51                 :  *
      52                 :  *-------------------------------------------------------------------------
      53                 :  */
      54                 : 
      55                 : #include "postgres.h"
      56                 : 
      57                 : #include <limits.h>
      58                 : 
      59                 : #include "access/htup_details.h"
      60                 : #include "commands/tablespace.h"
      61                 : #include "executor/executor.h"
      62                 : #include "miscadmin.h"
      63                 : #include "storage/buffile.h"
      64                 : #include "utils/memutils.h"
      65                 : #include "utils/resowner.h"
      66                 : 
      67                 : 
      68                 : /*
      69                 :  * Possible states of a Tuplestore object.  These denote the states that
      70                 :  * persist between calls of Tuplestore routines.
      71                 :  */
      72                 : typedef enum
      73                 : {
      74                 :     TSS_INMEM,                  /* Tuples still fit in memory */
      75                 :     TSS_WRITEFILE,              /* Writing to temp file */
      76                 :     TSS_READFILE                /* Reading from temp file */
      77                 : } TupStoreStatus;
      78                 : 
      79                 : /*
      80                 :  * State for a single read pointer.  If we are in state INMEM then all the
      81                 :  * read pointers' "current" fields denote the read positions.  In state
      82                 :  * WRITEFILE, the file/offset fields denote the read positions.  In state
      83                 :  * READFILE, inactive read pointers have valid file/offset, but the active
      84                 :  * read pointer implicitly has position equal to the temp file's seek position.
      85                 :  *
      86                 :  * Special case: if eof_reached is true, then the pointer's read position is
      87                 :  * implicitly equal to the write position, and current/file/offset aren't
      88                 :  * maintained.  This way we need not update all the read pointers each time
      89                 :  * we write.
      90                 :  */
      91                 : typedef struct
      92                 : {
      93                 :     int         eflags;         /* capability flags */
      94                 :     bool        eof_reached;    /* read has reached EOF */
      95                 :     int         current;        /* next array index to read */
      96                 :     int         file;           /* temp file# */
      97                 :     off_t       offset;         /* byte offset in file */
      98                 : } TSReadPointer;
      99                 : 
     100                 : /*
     101                 :  * Private state of a Tuplestore operation.
     102                 :  */
     103                 : struct Tuplestorestate
     104                 : {
     105                 :     TupStoreStatus status;      /* enumerated value as shown above */
     106                 :     int         eflags;         /* capability flags (OR of pointers' flags) */
     107                 :     bool        backward;       /* store extra length words in file? */
     108                 :     bool        interXact;      /* keep open through transactions? */
     109                 :     bool        truncated;      /* tuplestore_trim has removed tuples? */
     110                 :     int64       availMem;       /* remaining memory available, in bytes */
     111                 :     int64       allowedMem;     /* total memory allowed, in bytes */
     112                 :     int64       tuples;         /* number of tuples added */
     113                 :     BufFile    *myfile;         /* underlying file, or NULL if none */
     114                 :     MemoryContext context;      /* memory context for holding tuples */
     115                 :     ResourceOwner resowner;     /* resowner for holding temp files */
     116                 : 
     117                 :     /*
     118                 :      * These function pointers decouple the routines that must know what kind
     119                 :      * of tuple we are handling from the routines that don't need to know it.
     120                 :      * They are set up by the tuplestore_begin_xxx routines.
     121                 :      *
     122                 :      * (Although tuplestore.c currently only supports heap tuples, I've copied
     123                 :      * this part of tuplesort.c so that extension to other kinds of objects
     124                 :      * will be easy if it's ever needed.)
     125                 :      *
     126                 :      * Function to copy a supplied input tuple into palloc'd space. (NB: we
     127                 :      * assume that a single pfree() is enough to release the tuple later, so
     128                 :      * the representation must be "flat" in one palloc chunk.) state->availMem
     129                 :      * must be decreased by the amount of space used.
     130                 :      */
     131                 :     void       *(*copytup) (Tuplestorestate *state, void *tup);
     132                 : 
     133                 :     /*
     134                 :      * Function to write a stored tuple onto tape.  The representation of the
     135                 :      * tuple on tape need not be the same as it is in memory; requirements on
     136                 :      * the tape representation are given below.  After writing the tuple,
     137                 :      * pfree() it, and increase state->availMem by the amount of memory space
     138                 :      * thereby released.
     139                 :      */
     140                 :     void        (*writetup) (Tuplestorestate *state, void *tup);
     141                 : 
     142                 :     /*
     143                 :      * Function to read a stored tuple from tape back into memory. 'len' is
     144                 :      * the already-read length of the stored tuple.  Create and return a
     145                 :      * palloc'd copy, and decrease state->availMem by the amount of memory
     146                 :      * space consumed.
     147                 :      */
     148                 :     void       *(*readtup) (Tuplestorestate *state, unsigned int len);
     149                 : 
     150                 :     /*
     151                 :      * This array holds pointers to tuples in memory if we are in state INMEM.
     152                 :      * In states WRITEFILE and READFILE it's not used.
     153                 :      *
     154                 :      * When memtupdeleted > 0, the first memtupdeleted pointers are already
     155                 :      * released due to a tuplestore_trim() operation, but we haven't expended
     156                 :      * the effort to slide the remaining pointers down.  These unused pointers
     157                 :      * are set to NULL to catch any invalid accesses.  Note that memtupcount
     158                 :      * includes the deleted pointers.
     159                 :      */
     160                 :     void      **memtuples;      /* array of pointers to palloc'd tuples */
     161                 :     int         memtupdeleted;  /* the first N slots are currently unused */
     162                 :     int         memtupcount;    /* number of tuples currently present */
     163                 :     int         memtupsize;     /* allocated length of memtuples array */
     164                 :     bool        growmemtuples;  /* memtuples' growth still underway? */
     165                 : 
     166                 :     /*
     167                 :      * These variables are used to keep track of the current positions.
     168                 :      *
     169                 :      * In state WRITEFILE, the current file seek position is the write point;
     170                 :      * in state READFILE, the write position is remembered in writepos_xxx.
     171                 :      * (The write position is the same as EOF, but since BufFileSeek doesn't
     172                 :      * currently implement SEEK_END, we have to remember it explicitly.)
     173                 :      */
     174                 :     TSReadPointer *readptrs;    /* array of read pointers */
     175                 :     int         activeptr;      /* index of the active read pointer */
     176                 :     int         readptrcount;   /* number of pointers currently valid */
     177                 :     int         readptrsize;    /* allocated length of readptrs array */
     178                 : 
     179                 :     int         writepos_file;  /* file# (valid if READFILE state) */
     180                 :     off_t       writepos_offset;    /* offset (valid if READFILE state) */
     181                 : };
     182                 : 
     183                 : #define COPYTUP(state,tup)  ((*(state)->copytup) (state, tup))
     184                 : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
     185                 : #define READTUP(state,len)  ((*(state)->readtup) (state, len))
     186                 : #define LACKMEM(state)      ((state)->availMem < 0)
     187                 : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     188                 : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     189                 : 
     190                 : /*--------------------
     191                 :  *
     192                 :  * NOTES about on-tape representation of tuples:
     193                 :  *
     194                 :  * We require the first "unsigned int" of a stored tuple to be the total size
     195                 :  * on-tape of the tuple, including itself (so it is never zero).
     196                 :  * The remainder of the stored tuple
     197                 :  * may or may not match the in-memory representation of the tuple ---
     198                 :  * any conversion needed is the job of the writetup and readtup routines.
     199                 :  *
     200                 :  * If state->backward is true, then the stored representation of
     201                 :  * the tuple must be followed by another "unsigned int" that is a copy of the
     202                 :  * length --- so the total tape space used is actually sizeof(unsigned int)
     203                 :  * more than the stored length value.  This allows read-backwards.  When
     204                 :  * state->backward is not set, the write/read routines may omit the extra
     205                 :  * length word.
     206                 :  *
     207                 :  * writetup is expected to write both length words as well as the tuple
     208                 :  * data.  When readtup is called, the tape is positioned just after the
     209                 :  * front length word; readtup must read the tuple data and advance past
     210                 :  * the back length word (if present).
     211                 :  *
     212                 :  * The write/read routines can make use of the tuple description data
     213                 :  * stored in the Tuplestorestate record, if needed. They are also expected
     214                 :  * to adjust state->availMem by the amount of memory space (not tape space!)
     215                 :  * released or consumed.  There is no error return from either writetup
     216                 :  * or readtup; they should ereport() on failure.
     217                 :  *
     218                 :  *
     219                 :  * NOTES about memory consumption calculations:
     220                 :  *
     221                 :  * We count space allocated for tuples against the maxKBytes limit,
     222                 :  * plus the space used by the variable-size array memtuples.
     223                 :  * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
     224                 :  * We don't worry about the size of the read pointer array, either.
     225                 :  *
     226                 :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     227                 :  * rather than the originally-requested size.  This is important since
     228                 :  * palloc can add substantial overhead.  It's not a complete answer since
     229                 :  * we won't count any wasted space in palloc allocation blocks, but it's
     230                 :  * a lot better than what we were doing before 7.3.
     231                 :  *
     232                 :  *--------------------
     233                 :  */
     234                 : 
     235                 : 
     236                 : static Tuplestorestate *tuplestore_begin_common(int eflags,
     237                 :                                                 bool interXact,
     238                 :                                                 int maxKBytes);
     239                 : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
     240                 : static void dumptuples(Tuplestorestate *state);
     241                 : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
     242                 : static void *copytup_heap(Tuplestorestate *state, void *tup);
     243                 : static void writetup_heap(Tuplestorestate *state, void *tup);
     244                 : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
     245                 : 
     246                 : 
     247                 : /*
     248                 :  *      tuplestore_begin_xxx
     249                 :  *
     250                 :  * Initialize for a tuple store operation.
     251                 :  */
     252                 : static Tuplestorestate *
     253 CBC       97620 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
     254                 : {
     255                 :     Tuplestorestate *state;
     256                 : 
     257           97620 :     state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
     258                 : 
     259           97620 :     state->status = TSS_INMEM;
     260           97620 :     state->eflags = eflags;
     261           97620 :     state->interXact = interXact;
     262           97620 :     state->truncated = false;
     263           97620 :     state->allowedMem = maxKBytes * 1024L;
     264           97620 :     state->availMem = state->allowedMem;
     265           97620 :     state->myfile = NULL;
     266           97620 :     state->context = CurrentMemoryContext;
     267           97620 :     state->resowner = CurrentResourceOwner;
     268                 : 
     269           97620 :     state->memtupdeleted = 0;
     270           97620 :     state->memtupcount = 0;
     271           97620 :     state->tuples = 0;
     272                 : 
     273                 :     /*
     274                 :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     275                 :      * see comments in grow_memtuples().
     276                 :      */
     277           97620 :     state->memtupsize = Max(16384 / sizeof(void *),
     278                 :                             ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
     279                 : 
     280           97620 :     state->growmemtuples = true;
     281           97620 :     state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
     282                 : 
     283           97620 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     284                 : 
     285           97620 :     state->activeptr = 0;
     286           97620 :     state->readptrcount = 1;
     287           97620 :     state->readptrsize = 8;      /* arbitrary */
     288           97620 :     state->readptrs = (TSReadPointer *)
     289           97620 :         palloc(state->readptrsize * sizeof(TSReadPointer));
     290                 : 
     291           97620 :     state->readptrs[0].eflags = eflags;
     292           97620 :     state->readptrs[0].eof_reached = false;
     293           97620 :     state->readptrs[0].current = 0;
     294                 : 
     295           97620 :     return state;
     296                 : }
     297                 : 
     298                 : /*
     299                 :  * tuplestore_begin_heap
     300                 :  *
     301                 :  * Create a new tuplestore; other types of tuple stores (other than
     302                 :  * "heap" tuple stores, for heap tuples) are possible, but not presently
     303                 :  * implemented.
     304                 :  *
     305                 :  * randomAccess: if true, both forward and backward accesses to the
     306                 :  * tuple store are allowed.
     307                 :  *
     308                 :  * interXact: if true, the files used for on-disk storage persist beyond the
     309                 :  * end of the current transaction.  NOTE: It's the caller's responsibility to
     310                 :  * create such a tuplestore in a memory context and resource owner that will
     311                 :  * also survive transaction boundaries, and to ensure the tuplestore is closed
     312                 :  * when it's no longer wanted.
     313                 :  *
     314                 :  * maxKBytes: how much data to store in memory (any data beyond this
     315                 :  * amount is paged to disk).  When in doubt, use work_mem.
     316                 :  */
     317                 : Tuplestorestate *
     318           97620 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
     319                 : {
     320                 :     Tuplestorestate *state;
     321                 :     int         eflags;
     322                 : 
     323                 :     /*
     324                 :      * This interpretation of the meaning of randomAccess is compatible with
     325                 :      * the pre-8.3 behavior of tuplestores.
     326                 :      */
     327           97620 :     eflags = randomAccess ?
     328           97620 :         (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
     329                 :         (EXEC_FLAG_REWIND);
     330                 : 
     331           97620 :     state = tuplestore_begin_common(eflags, interXact, maxKBytes);
     332                 : 
     333           97620 :     state->copytup = copytup_heap;
     334           97620 :     state->writetup = writetup_heap;
     335           97620 :     state->readtup = readtup_heap;
     336                 : 
     337           97620 :     return state;
     338                 : }
     339                 : 
     340                 : /*
     341                 :  * tuplestore_set_eflags
     342                 :  *
     343                 :  * Set the capability flags for read pointer 0 at a finer grain than is
     344                 :  * allowed by tuplestore_begin_xxx.  This must be called before inserting
     345                 :  * any data into the tuplestore.
     346                 :  *
     347                 :  * eflags is a bitmask following the meanings used for executor node
     348                 :  * startup flags (see executor.h).  tuplestore pays attention to these bits:
     349                 :  *      EXEC_FLAG_REWIND        need rewind to start
     350                 :  *      EXEC_FLAG_BACKWARD      need backward fetch
     351                 :  * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
     352                 :  * is set per "randomAccess" in the tuplestore_begin_xxx call.
     353                 :  *
     354                 :  * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
     355                 :  * but not further than the truncation point (the furthest-back read pointer
     356                 :  * position at the time of the last tuplestore_trim call).
     357                 :  */
     358                 : void
     359            3892 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
     360                 : {
     361                 :     int         i;
     362                 : 
     363            3892 :     if (state->status != TSS_INMEM || state->memtupcount != 0)
     364 UBC           0 :         elog(ERROR, "too late to call tuplestore_set_eflags");
     365                 : 
     366 CBC        3892 :     state->readptrs[0].eflags = eflags;
     367            3892 :     for (i = 1; i < state->readptrcount; i++)
     368 UBC           0 :         eflags |= state->readptrs[i].eflags;
     369 CBC        3892 :     state->eflags = eflags;
     370            3892 : }
     371                 : 
     372                 : /*
     373                 :  * tuplestore_alloc_read_pointer - allocate another read pointer.
     374                 :  *
     375                 :  * Returns the pointer's index.
     376                 :  *
     377                 :  * The new pointer initially copies the position of read pointer 0.
     378                 :  * It can have its own eflags, but if any data has been inserted into
     379                 :  * the tuplestore, these eflags must not represent an increase in
     380                 :  * requirements.
     381                 :  */
     382                 : int
     383            5404 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
     384                 : {
     385                 :     /* Check for possible increase of requirements */
     386            5404 :     if (state->status != TSS_INMEM || state->memtupcount != 0)
     387                 :     {
     388             307 :         if ((state->eflags | eflags) != state->eflags)
     389 UBC           0 :             elog(ERROR, "too late to require new tuplestore eflags");
     390                 :     }
     391                 : 
     392                 :     /* Make room for another read pointer if needed */
     393 CBC        5404 :     if (state->readptrcount >= state->readptrsize)
     394                 :     {
     395               6 :         int         newcnt = state->readptrsize * 2;
     396                 : 
     397               6 :         state->readptrs = (TSReadPointer *)
     398               6 :             repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
     399               6 :         state->readptrsize = newcnt;
     400                 :     }
     401                 : 
     402                 :     /* And set it up */
     403            5404 :     state->readptrs[state->readptrcount] = state->readptrs[0];
     404            5404 :     state->readptrs[state->readptrcount].eflags = eflags;
     405                 : 
     406            5404 :     state->eflags |= eflags;
     407                 : 
     408            5404 :     return state->readptrcount++;
     409                 : }
     410                 : 
     411                 : /*
     412                 :  * tuplestore_clear
     413                 :  *
     414                 :  *  Delete all the contents of a tuplestore, and reset its read pointers
     415                 :  *  to the start.
     416                 :  */
     417                 : void
     418           33022 : tuplestore_clear(Tuplestorestate *state)
     419                 : {
     420                 :     int         i;
     421                 :     TSReadPointer *readptr;
     422                 : 
     423           33022 :     if (state->myfile)
     424 UBC           0 :         BufFileClose(state->myfile);
     425 CBC       33022 :     state->myfile = NULL;
     426           33022 :     if (state->memtuples)
     427                 :     {
     428           68286 :         for (i = state->memtupdeleted; i < state->memtupcount; i++)
     429                 :         {
     430           35264 :             FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
     431           35264 :             pfree(state->memtuples[i]);
     432                 :         }
     433                 :     }
     434           33022 :     state->status = TSS_INMEM;
     435           33022 :     state->truncated = false;
     436           33022 :     state->memtupdeleted = 0;
     437           33022 :     state->memtupcount = 0;
     438           33022 :     state->tuples = 0;
     439           33022 :     readptr = state->readptrs;
     440           66266 :     for (i = 0; i < state->readptrcount; readptr++, i++)
     441                 :     {
     442           33244 :         readptr->eof_reached = false;
     443           33244 :         readptr->current = 0;
     444                 :     }
     445           33022 : }
     446                 : 
     447                 : /*
     448                 :  * tuplestore_end
     449                 :  *
     450                 :  *  Release resources and clean up.
     451                 :  */
     452                 : void
     453           88486 : tuplestore_end(Tuplestorestate *state)
     454                 : {
     455                 :     int         i;
     456                 : 
     457           88486 :     if (state->myfile)
     458              59 :         BufFileClose(state->myfile);
     459           88486 :     if (state->memtuples)
     460                 :     {
     461         4195621 :         for (i = state->memtupdeleted; i < state->memtupcount; i++)
     462         4107135 :             pfree(state->memtuples[i]);
     463           88486 :         pfree(state->memtuples);
     464                 :     }
     465           88486 :     pfree(state->readptrs);
     466           88486 :     pfree(state);
     467           88486 : }
     468                 : 
     469                 : /*
     470                 :  * tuplestore_select_read_pointer - make the specified read pointer active
     471                 :  */
     472                 : void
     473         1795002 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
     474                 : {
     475                 :     TSReadPointer *readptr;
     476                 :     TSReadPointer *oldptr;
     477                 : 
     478         1795002 :     Assert(ptr >= 0 && ptr < state->readptrcount);
     479                 : 
     480                 :     /* No work if already active */
     481         1795002 :     if (ptr == state->activeptr)
     482          364998 :         return;
     483                 : 
     484         1430004 :     readptr = &state->readptrs[ptr];
     485         1430004 :     oldptr = &state->readptrs[state->activeptr];
     486                 : 
     487         1430004 :     switch (state->status)
     488                 :     {
     489         1430004 :         case TSS_INMEM:
     490                 :         case TSS_WRITEFILE:
     491                 :             /* no work */
     492         1430004 :             break;
     493 UBC           0 :         case TSS_READFILE:
     494                 : 
     495                 :             /*
     496                 :              * First, save the current read position in the pointer about to
     497                 :              * become inactive.
     498                 :              */
     499               0 :             if (!oldptr->eof_reached)
     500               0 :                 BufFileTell(state->myfile,
     501                 :                             &oldptr->file,
     502                 :                             &oldptr->offset);
     503                 : 
     504                 :             /*
     505                 :              * We have to make the temp file's seek position equal to the
     506                 :              * logical position of the new read pointer.  In eof_reached
     507                 :              * state, that's the EOF, which we have available from the saved
     508                 :              * write position.
     509                 :              */
     510               0 :             if (readptr->eof_reached)
     511                 :             {
     512               0 :                 if (BufFileSeek(state->myfile,
     513                 :                                 state->writepos_file,
     514                 :                                 state->writepos_offset,
     515                 :                                 SEEK_SET) != 0)
     516               0 :                     ereport(ERROR,
     517                 :                             (errcode_for_file_access(),
     518                 :                              errmsg("could not seek in tuplestore temporary file")));
     519                 :             }
     520                 :             else
     521                 :             {
     522               0 :                 if (BufFileSeek(state->myfile,
     523                 :                                 readptr->file,
     524                 :                                 readptr->offset,
     525                 :                                 SEEK_SET) != 0)
     526               0 :                     ereport(ERROR,
     527                 :                             (errcode_for_file_access(),
     528                 :                              errmsg("could not seek in tuplestore temporary file")));
     529                 :             }
     530               0 :             break;
     531               0 :         default:
     532               0 :             elog(ERROR, "invalid tuplestore state");
     533                 :             break;
     534                 :     }
     535                 : 
     536 CBC     1430004 :     state->activeptr = ptr;
     537                 : }
     538                 : 
     539                 : /*
     540                 :  * tuplestore_tuple_count
     541                 :  *
     542                 :  * Returns the number of tuples added since creation or the last
     543                 :  * tuplestore_clear().
     544                 :  */
     545                 : int64
     546            3157 : tuplestore_tuple_count(Tuplestorestate *state)
     547                 : {
     548            3157 :     return state->tuples;
     549                 : }
     550                 : 
     551                 : /*
     552                 :  * tuplestore_ateof
     553                 :  *
     554                 :  * Returns the active read pointer's eof_reached state.
     555                 :  */
     556                 : bool
     557         2390247 : tuplestore_ateof(Tuplestorestate *state)
     558                 : {
     559         2390247 :     return state->readptrs[state->activeptr].eof_reached;
     560                 : }
     561                 : 
     562                 : /*
     563                 :  * Grow the memtuples[] array, if possible within our memory constraint.  We
     564                 :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
     565                 :  * limit.  Return true if we were able to enlarge the array, false if not.
     566                 :  *
     567                 :  * Normally, at each increment we double the size of the array.  When doing
     568                 :  * that would exceed a limit, we attempt one last, smaller increase (and then
     569                 :  * clear the growmemtuples flag so we don't try any more).  That allows us to
     570                 :  * use memory as fully as permitted; sticking to the pure doubling rule could
     571                 :  * result in almost half going unused.  Because availMem moves around with
     572                 :  * tuple addition/removal, we need some rule to prevent making repeated small
     573                 :  * increases in memtupsize, which would just be useless thrashing.  The
     574                 :  * growmemtuples flag accomplishes that and also prevents useless
     575                 :  * recalculations in this function.
     576                 :  */
     577                 : static bool
     578             951 : grow_memtuples(Tuplestorestate *state)
     579                 : {
     580                 :     int         newmemtupsize;
     581             951 :     int         memtupsize = state->memtupsize;
     582             951 :     int64       memNowUsed = state->allowedMem - state->availMem;
     583                 : 
     584                 :     /* Forget it if we've already maxed out memtuples, per comment above */
     585             951 :     if (!state->growmemtuples)
     586              28 :         return false;
     587                 : 
     588                 :     /* Select new value of memtupsize */
     589             923 :     if (memNowUsed <= state->availMem)
     590                 :     {
     591                 :         /*
     592                 :          * We've used no more than half of allowedMem; double our usage,
     593                 :          * clamping at INT_MAX tuples.
     594                 :          */
     595             887 :         if (memtupsize < INT_MAX / 2)
     596             887 :             newmemtupsize = memtupsize * 2;
     597                 :         else
     598                 :         {
     599 UBC           0 :             newmemtupsize = INT_MAX;
     600               0 :             state->growmemtuples = false;
     601                 :         }
     602                 :     }
     603                 :     else
     604                 :     {
     605                 :         /*
     606                 :          * This will be the last increment of memtupsize.  Abandon doubling
     607                 :          * strategy and instead increase as much as we safely can.
     608                 :          *
     609                 :          * To stay within allowedMem, we can't increase memtupsize by more
     610                 :          * than availMem / sizeof(void *) elements. In practice, we want to
     611                 :          * increase it by considerably less, because we need to leave some
     612                 :          * space for the tuples to which the new array slots will refer.  We
     613                 :          * assume the new tuples will be about the same size as the tuples
     614                 :          * we've already seen, and thus we can extrapolate from the space
     615                 :          * consumption so far to estimate an appropriate new size for the
     616                 :          * memtuples array.  The optimal value might be higher or lower than
     617                 :          * this estimate, but it's hard to know that in advance.  We again
     618                 :          * clamp at INT_MAX tuples.
     619                 :          *
     620                 :          * This calculation is safe against enlarging the array so much that
     621                 :          * LACKMEM becomes true, because the memory currently used includes
     622                 :          * the present array; thus, there would be enough allowedMem for the
     623                 :          * new array elements even if no other memory were currently used.
     624                 :          *
     625                 :          * We do the arithmetic in float8, because otherwise the product of
     626                 :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
     627                 :          * result should be insignificant; but even if we computed a
     628                 :          * completely insane result, the checks below will prevent anything
     629                 :          * really bad from happening.
     630                 :          */
     631                 :         double      grow_ratio;
     632                 : 
     633 CBC          36 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
     634              36 :         if (memtupsize * grow_ratio < INT_MAX)
     635              36 :             newmemtupsize = (int) (memtupsize * grow_ratio);
     636                 :         else
     637 UBC           0 :             newmemtupsize = INT_MAX;
     638                 : 
     639                 :         /* We won't make any further enlargement attempts */
     640 CBC          36 :         state->growmemtuples = false;
     641                 :     }
     642                 : 
     643                 :     /* Must enlarge array by at least one element, else report failure */
     644             923 :     if (newmemtupsize <= memtupsize)
     645 UBC           0 :         goto noalloc;
     646                 : 
     647                 :     /*
     648                 :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
     649                 :      * to ensure our request won't be rejected.  Note that we can easily
     650                 :      * exhaust address space before facing this outcome.  (This is presently
     651                 :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
     652                 :      * don't rely on that at this distance.)
     653                 :      */
     654 CBC         923 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
     655                 :     {
     656 UBC           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
     657               0 :         state->growmemtuples = false;    /* can't grow any more */
     658                 :     }
     659                 : 
     660                 :     /*
     661                 :      * We need to be sure that we do not cause LACKMEM to become true, else
     662                 :      * the space management algorithm will go nuts.  The code above should
     663                 :      * never generate a dangerous request, but to be safe, check explicitly
     664                 :      * that the array growth fits within availMem.  (We could still cause
     665                 :      * LACKMEM if the memory chunk overhead associated with the memtuples
     666                 :      * array were to increase.  That shouldn't happen because we chose the
     667                 :      * initial array size large enough to ensure that palloc will be treating
     668                 :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
     669                 :      * explicitly below just in case.)
     670                 :      */
     671 CBC         923 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
     672 UBC           0 :         goto noalloc;
     673                 : 
     674                 :     /* OK, do it */
     675 CBC         923 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
     676             923 :     state->memtupsize = newmemtupsize;
     677             923 :     state->memtuples = (void **)
     678             923 :         repalloc_huge(state->memtuples,
     679             923 :                       state->memtupsize * sizeof(void *));
     680             923 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     681             923 :     if (LACKMEM(state))
     682 UBC           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplestore");
     683 CBC         923 :     return true;
     684                 : 
     685 UBC           0 : noalloc:
     686                 :     /* If for any reason we didn't realloc, shut off future attempts */
     687               0 :     state->growmemtuples = false;
     688               0 :     return false;
     689                 : }
     690                 : 
     691                 : /*
     692                 :  * Accept one tuple and append it to the tuplestore.
     693                 :  *
     694                 :  * Note that the input tuple is always copied; the caller need not save it.
     695                 :  *
     696                 :  * If the active read pointer is currently "at EOF", it remains so (the read
     697                 :  * pointer implicitly advances along with the write pointer); otherwise the
     698                 :  * read pointer is unchanged.  Non-active read pointers do not move, which
     699                 :  * means they are certain to not be "at EOF" immediately after puttuple.
     700                 :  * This curious-seeming behavior is for the convenience of nodeMaterial.c and
     701                 :  * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
     702                 :  * steps.
     703                 :  *
     704                 :  * tuplestore_puttupleslot() is a convenience routine to collect data from
     705                 :  * a TupleTableSlot without an extra copy operation.
     706                 :  */
     707                 : void
     708 CBC      903966 : tuplestore_puttupleslot(Tuplestorestate *state,
     709                 :                         TupleTableSlot *slot)
     710                 : {
     711                 :     MinimalTuple tuple;
     712          903966 :     MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
     713                 : 
     714                 :     /*
     715                 :      * Form a MinimalTuple in working memory
     716                 :      */
     717          903966 :     tuple = ExecCopySlotMinimalTuple(slot);
     718          903966 :     USEMEM(state, GetMemoryChunkSpace(tuple));
     719                 : 
     720          903966 :     tuplestore_puttuple_common(state, (void *) tuple);
     721                 : 
     722          903966 :     MemoryContextSwitchTo(oldcxt);
     723          903966 : }
     724                 : 
     725                 : /*
     726                 :  * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
     727                 :  * deprecated, but not worth getting rid of in view of the number of callers.
     728                 :  */
     729                 : void
     730          464617 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
     731                 : {
     732          464617 :     MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
     733                 : 
     734                 :     /*
     735                 :      * Copy the tuple.  (Must do this even in WRITEFILE case.  Note that
     736                 :      * COPYTUP includes USEMEM, so we needn't do that here.)
     737                 :      */
     738          464617 :     tuple = COPYTUP(state, tuple);
     739                 : 
     740          464617 :     tuplestore_puttuple_common(state, (void *) tuple);
     741                 : 
     742          464617 :     MemoryContextSwitchTo(oldcxt);
     743          464617 : }
     744                 : 
     745                 : /*
     746                 :  * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
     747                 :  * This avoids an extra tuple-construction operation.
     748                 :  */
     749                 : void
     750         7146291 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
     751                 :                      Datum *values, bool *isnull)
     752                 : {
     753                 :     MinimalTuple tuple;
     754         7146291 :     MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
     755                 : 
     756         7146291 :     tuple = heap_form_minimal_tuple(tdesc, values, isnull);
     757         7146291 :     USEMEM(state, GetMemoryChunkSpace(tuple));
     758                 : 
     759         7146291 :     tuplestore_puttuple_common(state, (void *) tuple);
     760                 : 
     761         7146291 :     MemoryContextSwitchTo(oldcxt);
     762         7146291 : }
     763                 : 
     764                 : static void
     765         8514874 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
     766                 : {
     767                 :     TSReadPointer *readptr;
     768                 :     int         i;
     769                 :     ResourceOwner oldowner;
     770                 : 
     771         8514874 :     state->tuples++;
     772                 : 
     773         8514874 :     switch (state->status)
     774                 :     {
     775         6319065 :         case TSS_INMEM:
     776                 : 
     777                 :             /*
     778                 :              * Update read pointers as needed; see API spec above.
     779                 :              */
     780         6319065 :             readptr = state->readptrs;
     781        13720336 :             for (i = 0; i < state->readptrcount; readptr++, i++)
     782                 :             {
     783         7401271 :                 if (readptr->eof_reached && i != state->activeptr)
     784                 :                 {
     785             215 :                     readptr->eof_reached = false;
     786             215 :                     readptr->current = state->memtupcount;
     787                 :                 }
     788                 :             }
     789                 : 
     790                 :             /*
     791                 :              * Grow the array as needed.  Note that we try to grow the array
     792                 :              * when there is still one free slot remaining --- if we fail,
     793                 :              * there'll still be room to store the incoming tuple, and then
     794                 :              * we'll switch to tape-based operation.
     795                 :              */
     796         6319065 :             if (state->memtupcount >= state->memtupsize - 1)
     797                 :             {
     798             951 :                 (void) grow_memtuples(state);
     799             951 :                 Assert(state->memtupcount < state->memtupsize);
     800                 :             }
     801                 : 
     802                 :             /* Stash the tuple in the in-memory array */
     803         6319065 :             state->memtuples[state->memtupcount++] = tuple;
     804                 : 
     805                 :             /*
     806                 :              * Done if we still fit in available memory and have array slots.
     807                 :              */
     808         6319065 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
     809         6319004 :                 return;
     810                 : 
     811                 :             /*
     812                 :              * Nope; time to switch to tape-based operation.  Make sure that
     813                 :              * the temp file(s) are created in suitable temp tablespaces.
     814                 :              */
     815              61 :             PrepareTempTablespaces();
     816                 : 
     817                 :             /* associate the file with the store's resource owner */
     818              61 :             oldowner = CurrentResourceOwner;
     819              61 :             CurrentResourceOwner = state->resowner;
     820                 : 
     821              61 :             state->myfile = BufFileCreateTemp(state->interXact);
     822                 : 
     823              61 :             CurrentResourceOwner = oldowner;
     824                 : 
     825                 :             /*
     826                 :              * Freeze the decision about whether trailing length words will be
     827                 :              * used.  We can't change this choice once data is on tape, even
     828                 :              * though callers might drop the requirement.
     829                 :              */
     830              61 :             state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
     831              61 :             state->status = TSS_WRITEFILE;
     832              61 :             dumptuples(state);
     833              61 :             break;
     834         2195809 :         case TSS_WRITEFILE:
     835                 : 
     836                 :             /*
     837                 :              * Update read pointers as needed; see API spec above. Note:
     838                 :              * BufFileTell is quite cheap, so not worth trying to avoid
     839                 :              * multiple calls.
     840                 :              */
     841         2195809 :             readptr = state->readptrs;
     842         4391618 :             for (i = 0; i < state->readptrcount; readptr++, i++)
     843                 :             {
     844         2195809 :                 if (readptr->eof_reached && i != state->activeptr)
     845                 :                 {
     846 UBC           0 :                     readptr->eof_reached = false;
     847               0 :                     BufFileTell(state->myfile,
     848                 :                                 &readptr->file,
     849                 :                                 &readptr->offset);
     850                 :                 }
     851                 :             }
     852                 : 
     853 CBC     2195809 :             WRITETUP(state, tuple);
     854         2195809 :             break;
     855 UBC           0 :         case TSS_READFILE:
     856                 : 
     857                 :             /*
     858                 :              * Switch from reading to writing.
     859                 :              */
     860               0 :             if (!state->readptrs[state->activeptr].eof_reached)
     861               0 :                 BufFileTell(state->myfile,
     862               0 :                             &state->readptrs[state->activeptr].file,
     863               0 :                             &state->readptrs[state->activeptr].offset);
     864               0 :             if (BufFileSeek(state->myfile,
     865                 :                             state->writepos_file, state->writepos_offset,
     866                 :                             SEEK_SET) != 0)
     867               0 :                 ereport(ERROR,
     868                 :                         (errcode_for_file_access(),
     869                 :                          errmsg("could not seek in tuplestore temporary file")));
     870               0 :             state->status = TSS_WRITEFILE;
     871                 : 
     872                 :             /*
     873                 :              * Update read pointers as needed; see API spec above.
     874                 :              */
     875               0 :             readptr = state->readptrs;
     876               0 :             for (i = 0; i < state->readptrcount; readptr++, i++)
     877                 :             {
     878               0 :                 if (readptr->eof_reached && i != state->activeptr)
     879                 :                 {
     880               0 :                     readptr->eof_reached = false;
     881               0 :                     readptr->file = state->writepos_file;
     882               0 :                     readptr->offset = state->writepos_offset;
     883                 :                 }
     884                 :             }
     885                 : 
     886               0 :             WRITETUP(state, tuple);
     887               0 :             break;
     888               0 :         default:
     889               0 :             elog(ERROR, "invalid tuplestore state");
     890                 :             break;
     891                 :     }
     892                 : }
     893                 : 
     894                 : /*
     895                 :  * Fetch the next tuple in either forward or back direction.
     896                 :  * Returns NULL if no more tuples.  If should_free is set, the
     897                 :  * caller must pfree the returned tuple when done with it.
     898                 :  *
     899                 :  * Backward scan is only allowed if randomAccess was set true or
     900                 :  * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
     901                 :  */
     902                 : static void *
     903 CBC    11072013 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
     904                 :                     bool *should_free)
     905                 : {
     906        11072013 :     TSReadPointer *readptr = &state->readptrs[state->activeptr];
     907                 :     unsigned int tuplen;
     908                 :     void       *tup;
     909                 : 
     910        11072013 :     Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
     911                 : 
     912        11072013 :     switch (state->status)
     913                 :     {
     914         7386725 :         case TSS_INMEM:
     915         7386725 :             *should_free = false;
     916         7386725 :             if (forward)
     917                 :             {
     918         7300000 :                 if (readptr->eof_reached)
     919              90 :                     return NULL;
     920         7299910 :                 if (readptr->current < state->memtupcount)
     921                 :                 {
     922                 :                     /* We have another tuple, so return it */
     923         6943202 :                     return state->memtuples[readptr->current++];
     924                 :                 }
     925          356708 :                 readptr->eof_reached = true;
     926          356708 :                 return NULL;
     927                 :             }
     928                 :             else
     929                 :             {
     930                 :                 /*
     931                 :                  * if all tuples are fetched already then we return last
     932                 :                  * tuple, else tuple before last returned.
     933                 :                  */
     934           86725 :                 if (readptr->eof_reached)
     935                 :                 {
     936             890 :                     readptr->current = state->memtupcount;
     937             890 :                     readptr->eof_reached = false;
     938                 :                 }
     939                 :                 else
     940                 :                 {
     941           85835 :                     if (readptr->current <= state->memtupdeleted)
     942                 :                     {
     943 UBC           0 :                         Assert(!state->truncated);
     944               0 :                         return NULL;
     945                 :                     }
     946 CBC       85835 :                     readptr->current--; /* last returned tuple */
     947                 :                 }
     948           86725 :                 if (readptr->current <= state->memtupdeleted)
     949                 :                 {
     950              15 :                     Assert(!state->truncated);
     951              15 :                     return NULL;
     952                 :                 }
     953           86710 :                 return state->memtuples[readptr->current - 1];
     954                 :             }
     955                 :             break;
     956                 : 
     957              60 :         case TSS_WRITEFILE:
     958                 :             /* Skip state change if we'll just return NULL */
     959              60 :             if (readptr->eof_reached && forward)
     960 UBC           0 :                 return NULL;
     961                 : 
     962                 :             /*
     963                 :              * Switch from writing to reading.
     964                 :              */
     965 CBC          60 :             BufFileTell(state->myfile,
     966                 :                         &state->writepos_file, &state->writepos_offset);
     967              60 :             if (!readptr->eof_reached)
     968              60 :                 if (BufFileSeek(state->myfile,
     969                 :                                 readptr->file, readptr->offset,
     970                 :                                 SEEK_SET) != 0)
     971 UBC           0 :                     ereport(ERROR,
     972                 :                             (errcode_for_file_access(),
     973                 :                              errmsg("could not seek in tuplestore temporary file")));
     974 CBC          60 :             state->status = TSS_READFILE;
     975                 :             /* FALLTHROUGH */
     976                 : 
     977         3685288 :         case TSS_READFILE:
     978         3685288 :             *should_free = true;
     979         3685288 :             if (forward)
     980                 :             {
     981         3685282 :                 if ((tuplen = getlen(state, true)) != 0)
     982                 :                 {
     983         3685228 :                     tup = READTUP(state, tuplen);
     984         3685228 :                     return tup;
     985                 :                 }
     986                 :                 else
     987                 :                 {
     988              54 :                     readptr->eof_reached = true;
     989              54 :                     return NULL;
     990                 :                 }
     991                 :             }
     992                 : 
     993                 :             /*
     994                 :              * Backward.
     995                 :              *
     996                 :              * if all tuples are fetched already then we return last tuple,
     997                 :              * else tuple before last returned.
     998                 :              *
     999                 :              * Back up to fetch previously-returned tuple's ending length
    1000                 :              * word. If seek fails, assume we are at start of file.
    1001                 :              */
    1002               6 :             if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
    1003                 :                             SEEK_CUR) != 0)
    1004                 :             {
    1005                 :                 /* even a failed backwards fetch gets you out of eof state */
    1006 UBC           0 :                 readptr->eof_reached = false;
    1007               0 :                 Assert(!state->truncated);
    1008               0 :                 return NULL;
    1009                 :             }
    1010 CBC           6 :             tuplen = getlen(state, false);
    1011                 : 
    1012               6 :             if (readptr->eof_reached)
    1013                 :             {
    1014               3 :                 readptr->eof_reached = false;
    1015                 :                 /* We will return the tuple returned before returning NULL */
    1016                 :             }
    1017                 :             else
    1018                 :             {
    1019                 :                 /*
    1020                 :                  * Back up to get ending length word of tuple before it.
    1021                 :                  */
    1022               3 :                 if (BufFileSeek(state->myfile, 0,
    1023               3 :                                 -(long) (tuplen + 2 * sizeof(unsigned int)),
    1024                 :                                 SEEK_CUR) != 0)
    1025                 :                 {
    1026                 :                     /*
    1027                 :                      * If that fails, presumably the prev tuple is the first
    1028                 :                      * in the file.  Back up so that it becomes next to read
    1029                 :                      * in forward direction (not obviously right, but that is
    1030                 :                      * what in-memory case does).
    1031                 :                      */
    1032 UBC           0 :                     if (BufFileSeek(state->myfile, 0,
    1033               0 :                                     -(long) (tuplen + sizeof(unsigned int)),
    1034                 :                                     SEEK_CUR) != 0)
    1035               0 :                         ereport(ERROR,
    1036                 :                                 (errcode_for_file_access(),
    1037                 :                                  errmsg("could not seek in tuplestore temporary file")));
    1038               0 :                     Assert(!state->truncated);
    1039               0 :                     return NULL;
    1040                 :                 }
    1041 CBC           3 :                 tuplen = getlen(state, false);
    1042                 :             }
    1043                 : 
    1044                 :             /*
    1045                 :              * Now we have the length of the prior tuple, back up and read it.
    1046                 :              * Note: READTUP expects we are positioned after the initial
    1047                 :              * length word of the tuple, so back up to that point.
    1048                 :              */
    1049               6 :             if (BufFileSeek(state->myfile, 0,
    1050               6 :                             -(long) tuplen,
    1051                 :                             SEEK_CUR) != 0)
    1052 UBC           0 :                 ereport(ERROR,
    1053                 :                         (errcode_for_file_access(),
    1054                 :                          errmsg("could not seek in tuplestore temporary file")));
    1055 CBC           6 :             tup = READTUP(state, tuplen);
    1056               6 :             return tup;
    1057                 : 
    1058 UBC           0 :         default:
    1059               0 :             elog(ERROR, "invalid tuplestore state");
    1060                 :             return NULL;        /* keep compiler quiet */
    1061                 :     }
    1062                 : }
    1063                 : 
    1064                 : /*
    1065                 :  * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
    1066                 :  *
    1067                 :  * If successful, put tuple in slot and return true; else, clear the slot
    1068                 :  * and return false.
    1069                 :  *
    1070                 :  * If copy is true, the slot receives a copied tuple (allocated in current
    1071                 :  * memory context) that will stay valid regardless of future manipulations of
    1072                 :  * the tuplestore's state.  If copy is false, the slot may just receive a
    1073                 :  * pointer to a tuple held within the tuplestore.  The latter is more
    1074                 :  * efficient but the slot contents may be corrupted if additional writes to
    1075                 :  * the tuplestore occur.  (If using tuplestore_trim, see comments therein.)
    1076                 :  */
    1077                 : bool
    1078 CBC    10986736 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
    1079                 :                         bool copy, TupleTableSlot *slot)
    1080                 : {
    1081                 :     MinimalTuple tuple;
    1082                 :     bool        should_free;
    1083                 : 
    1084        10986736 :     tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
    1085                 : 
    1086        10986736 :     if (tuple)
    1087                 :     {
    1088        10630735 :         if (copy && !should_free)
    1089                 :         {
    1090          840164 :             tuple = heap_copy_minimal_tuple(tuple);
    1091          840164 :             should_free = true;
    1092                 :         }
    1093        10630735 :         ExecStoreMinimalTuple(tuple, slot, should_free);
    1094        10630735 :         return true;
    1095                 :     }
    1096                 :     else
    1097                 :     {
    1098          356001 :         ExecClearTuple(slot);
    1099          356001 :         return false;
    1100                 :     }
    1101                 : }
    1102                 : 
    1103                 : /*
    1104                 :  * tuplestore_advance - exported function to adjust position without fetching
    1105                 :  *
    1106                 :  * We could optimize this case to avoid palloc/pfree overhead, but for the
    1107                 :  * moment it doesn't seem worthwhile.
    1108                 :  */
    1109                 : bool
    1110           85271 : tuplestore_advance(Tuplestorestate *state, bool forward)
    1111                 : {
    1112                 :     void       *tuple;
    1113                 :     bool        should_free;
    1114                 : 
    1115           85271 :     tuple = tuplestore_gettuple(state, forward, &should_free);
    1116                 : 
    1117           85271 :     if (tuple)
    1118                 :     {
    1119           84405 :         if (should_free)
    1120 UBC           0 :             pfree(tuple);
    1121 CBC       84405 :         return true;
    1122                 :     }
    1123                 :     else
    1124                 :     {
    1125             866 :         return false;
    1126                 :     }
    1127                 : }
    1128                 : 
    1129                 : /*
    1130                 :  * Advance over N tuples in either forward or back direction,
    1131                 :  * without returning any data.  N<=0 is a no-op.
    1132                 :  * Returns true if successful, false if ran out of tuples.
    1133                 :  */
    1134                 : bool
    1135          610077 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
    1136                 : {
    1137          610077 :     TSReadPointer *readptr = &state->readptrs[state->activeptr];
    1138                 : 
    1139          610077 :     Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
    1140                 : 
    1141          610077 :     if (ntuples <= 0)
    1142               9 :         return true;
    1143                 : 
    1144          610068 :     switch (state->status)
    1145                 :     {
    1146          610065 :         case TSS_INMEM:
    1147          610065 :             if (forward)
    1148                 :             {
    1149          608759 :                 if (readptr->eof_reached)
    1150 UBC           0 :                     return false;
    1151 CBC      608759 :                 if (state->memtupcount - readptr->current >= ntuples)
    1152                 :                 {
    1153          608714 :                     readptr->current += ntuples;
    1154          608714 :                     return true;
    1155                 :                 }
    1156              45 :                 readptr->current = state->memtupcount;
    1157              45 :                 readptr->eof_reached = true;
    1158              45 :                 return false;
    1159                 :             }
    1160                 :             else
    1161                 :             {
    1162            1306 :                 if (readptr->eof_reached)
    1163                 :                 {
    1164 UBC           0 :                     readptr->current = state->memtupcount;
    1165               0 :                     readptr->eof_reached = false;
    1166               0 :                     ntuples--;
    1167                 :                 }
    1168 CBC        1306 :                 if (readptr->current - state->memtupdeleted > ntuples)
    1169                 :                 {
    1170            1306 :                     readptr->current -= ntuples;
    1171            1306 :                     return true;
    1172                 :                 }
    1173 UBC           0 :                 Assert(!state->truncated);
    1174               0 :                 readptr->current = state->memtupdeleted;
    1175               0 :                 return false;
    1176                 :             }
    1177                 :             break;
    1178                 : 
    1179 CBC           3 :         default:
    1180                 :             /* We don't currently try hard to optimize other cases */
    1181               9 :             while (ntuples-- > 0)
    1182                 :             {
    1183                 :                 void       *tuple;
    1184                 :                 bool        should_free;
    1185                 : 
    1186               6 :                 tuple = tuplestore_gettuple(state, forward, &should_free);
    1187                 : 
    1188               6 :                 if (tuple == NULL)
    1189 UBC           0 :                     return false;
    1190 CBC           6 :                 if (should_free)
    1191               6 :                     pfree(tuple);
    1192               6 :                 CHECK_FOR_INTERRUPTS();
    1193                 :             }
    1194               3 :             return true;
    1195                 :     }
    1196                 : }
    1197                 : 
    1198                 : /*
    1199                 :  * dumptuples - remove tuples from memory and write to tape
    1200                 :  *
    1201                 :  * As a side effect, we must convert each read pointer's position from
    1202                 :  * "current" to file/offset format.  But eof_reached pointers don't
    1203                 :  * need to change state.
    1204                 :  */
    1205                 : static void
    1206              61 : dumptuples(Tuplestorestate *state)
    1207                 : {
    1208                 :     int         i;
    1209                 : 
    1210              61 :     for (i = state->memtupdeleted;; i++)
    1211         1748052 :     {
    1212         1748113 :         TSReadPointer *readptr = state->readptrs;
    1213                 :         int         j;
    1214                 : 
    1215         3496226 :         for (j = 0; j < state->readptrcount; readptr++, j++)
    1216                 :         {
    1217         1748113 :             if (i == readptr->current && !readptr->eof_reached)
    1218              61 :                 BufFileTell(state->myfile,
    1219                 :                             &readptr->file, &readptr->offset);
    1220                 :         }
    1221         1748113 :         if (i >= state->memtupcount)
    1222              61 :             break;
    1223         1748052 :         WRITETUP(state, state->memtuples[i]);
    1224                 :     }
    1225              61 :     state->memtupdeleted = 0;
    1226              61 :     state->memtupcount = 0;
    1227              61 : }
    1228                 : 
    1229                 : /*
    1230                 :  * tuplestore_rescan        - rewind the active read pointer to start
    1231                 :  */
    1232                 : void
    1233          329202 : tuplestore_rescan(Tuplestorestate *state)
    1234                 : {
    1235          329202 :     TSReadPointer *readptr = &state->readptrs[state->activeptr];
    1236                 : 
    1237          329202 :     Assert(readptr->eflags & EXEC_FLAG_REWIND);
    1238          329202 :     Assert(!state->truncated);
    1239                 : 
    1240          329202 :     switch (state->status)
    1241                 :     {
    1242          329142 :         case TSS_INMEM:
    1243          329142 :             readptr->eof_reached = false;
    1244          329142 :             readptr->current = 0;
    1245          329142 :             break;
    1246              60 :         case TSS_WRITEFILE:
    1247              60 :             readptr->eof_reached = false;
    1248              60 :             readptr->file = 0;
    1249 GNC          60 :             readptr->offset = 0;
    1250 CBC          60 :             break;
    1251 UBC           0 :         case TSS_READFILE:
    1252               0 :             readptr->eof_reached = false;
    1253 UNC           0 :             if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
    1254 UBC           0 :                 ereport(ERROR,
    1255                 :                         (errcode_for_file_access(),
    1256                 :                          errmsg("could not seek in tuplestore temporary file")));
    1257               0 :             break;
    1258               0 :         default:
    1259               0 :             elog(ERROR, "invalid tuplestore state");
    1260                 :             break;
    1261                 :     }
    1262 CBC      329202 : }
    1263                 : 
    1264                 : /*
    1265                 :  * tuplestore_copy_read_pointer - copy a read pointer's state to another
    1266                 :  */
    1267                 : void
    1268           30403 : tuplestore_copy_read_pointer(Tuplestorestate *state,
    1269                 :                              int srcptr, int destptr)
    1270                 : {
    1271           30403 :     TSReadPointer *sptr = &state->readptrs[srcptr];
    1272           30403 :     TSReadPointer *dptr = &state->readptrs[destptr];
    1273                 : 
    1274           30403 :     Assert(srcptr >= 0 && srcptr < state->readptrcount);
    1275           30403 :     Assert(destptr >= 0 && destptr < state->readptrcount);
    1276                 : 
    1277                 :     /* Assigning to self is a no-op */
    1278           30403 :     if (srcptr == destptr)
    1279 UBC           0 :         return;
    1280                 : 
    1281 CBC       30403 :     if (dptr->eflags != sptr->eflags)
    1282                 :     {
    1283                 :         /* Possible change of overall eflags, so copy and then recompute */
    1284                 :         int         eflags;
    1285                 :         int         i;
    1286                 : 
    1287 UBC           0 :         *dptr = *sptr;
    1288               0 :         eflags = state->readptrs[0].eflags;
    1289               0 :         for (i = 1; i < state->readptrcount; i++)
    1290               0 :             eflags |= state->readptrs[i].eflags;
    1291               0 :         state->eflags = eflags;
    1292                 :     }
    1293                 :     else
    1294 CBC       30403 :         *dptr = *sptr;
    1295                 : 
    1296           30403 :     switch (state->status)
    1297                 :     {
    1298           30403 :         case TSS_INMEM:
    1299                 :         case TSS_WRITEFILE:
    1300                 :             /* no work */
    1301           30403 :             break;
    1302 UBC           0 :         case TSS_READFILE:
    1303                 : 
    1304                 :             /*
    1305                 :              * This case is a bit tricky since the active read pointer's
    1306                 :              * position corresponds to the seek point, not what is in its
    1307                 :              * variables.  Assigning to the active requires a seek, and
    1308                 :              * assigning from the active requires a tell, except when
    1309                 :              * eof_reached.
    1310                 :              */
    1311               0 :             if (destptr == state->activeptr)
    1312                 :             {
    1313               0 :                 if (dptr->eof_reached)
    1314                 :                 {
    1315               0 :                     if (BufFileSeek(state->myfile,
    1316                 :                                     state->writepos_file,
    1317                 :                                     state->writepos_offset,
    1318                 :                                     SEEK_SET) != 0)
    1319               0 :                         ereport(ERROR,
    1320                 :                                 (errcode_for_file_access(),
    1321                 :                                  errmsg("could not seek in tuplestore temporary file")));
    1322                 :                 }
    1323                 :                 else
    1324                 :                 {
    1325               0 :                     if (BufFileSeek(state->myfile,
    1326                 :                                     dptr->file, dptr->offset,
    1327                 :                                     SEEK_SET) != 0)
    1328               0 :                         ereport(ERROR,
    1329                 :                                 (errcode_for_file_access(),
    1330                 :                                  errmsg("could not seek in tuplestore temporary file")));
    1331                 :                 }
    1332                 :             }
    1333               0 :             else if (srcptr == state->activeptr)
    1334                 :             {
    1335               0 :                 if (!dptr->eof_reached)
    1336               0 :                     BufFileTell(state->myfile,
    1337                 :                                 &dptr->file,
    1338                 :                                 &dptr->offset);
    1339                 :             }
    1340               0 :             break;
    1341               0 :         default:
    1342               0 :             elog(ERROR, "invalid tuplestore state");
    1343                 :             break;
    1344                 :     }
    1345                 : }
    1346                 : 
    1347                 : /*
    1348                 :  * tuplestore_trim  - remove all no-longer-needed tuples
    1349                 :  *
    1350                 :  * Calling this function authorizes the tuplestore to delete all tuples
    1351                 :  * before the oldest read pointer, if no read pointer is marked as requiring
    1352                 :  * REWIND capability.
    1353                 :  *
    1354                 :  * Note: this is obviously safe if no pointer has BACKWARD capability either.
    1355                 :  * If a pointer is marked as BACKWARD but not REWIND capable, it means that
    1356                 :  * the pointer can be moved backward but not before the oldest other read
    1357                 :  * pointer.
    1358                 :  */
    1359                 : void
    1360 CBC      397670 : tuplestore_trim(Tuplestorestate *state)
    1361                 : {
    1362                 :     int         oldest;
    1363                 :     int         nremove;
    1364                 :     int         i;
    1365                 : 
    1366                 :     /*
    1367                 :      * Truncation is disallowed if any read pointer requires rewind
    1368                 :      * capability.
    1369                 :      */
    1370          397670 :     if (state->eflags & EXEC_FLAG_REWIND)
    1371 UBC           0 :         return;
    1372                 : 
    1373                 :     /*
    1374                 :      * We don't bother trimming temp files since it usually would mean more
    1375                 :      * work than just letting them sit in kernel buffers until they age out.
    1376                 :      */
    1377 CBC      397670 :     if (state->status != TSS_INMEM)
    1378 UBC           0 :         return;
    1379                 : 
    1380                 :     /* Find the oldest read pointer */
    1381 CBC      397670 :     oldest = state->memtupcount;
    1382         1716894 :     for (i = 0; i < state->readptrcount; i++)
    1383                 :     {
    1384         1319224 :         if (!state->readptrs[i].eof_reached)
    1385         1305065 :             oldest = Min(oldest, state->readptrs[i].current);
    1386                 :     }
    1387                 : 
    1388                 :     /*
    1389                 :      * Note: you might think we could remove all the tuples before the oldest
    1390                 :      * "current", since that one is the next to be returned.  However, since
    1391                 :      * tuplestore_gettuple returns a direct pointer to our internal copy of
    1392                 :      * the tuple, it's likely that the caller has still got the tuple just
    1393                 :      * before "current" referenced in a slot. So we keep one extra tuple
    1394                 :      * before the oldest "current".  (Strictly speaking, we could require such
    1395                 :      * callers to use the "copy" flag to tuplestore_gettupleslot, but for
    1396                 :      * efficiency we allow this one case to not use "copy".)
    1397                 :      */
    1398          397670 :     nremove = oldest - 1;
    1399          397670 :     if (nremove <= 0)
    1400            3069 :         return;                 /* nothing to do */
    1401                 : 
    1402          394601 :     Assert(nremove >= state->memtupdeleted);
    1403          394601 :     Assert(nremove <= state->memtupcount);
    1404                 : 
    1405                 :     /* Release no-longer-needed tuples */
    1406          789697 :     for (i = state->memtupdeleted; i < nremove; i++)
    1407                 :     {
    1408          395096 :         FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
    1409          395096 :         pfree(state->memtuples[i]);
    1410          395096 :         state->memtuples[i] = NULL;
    1411                 :     }
    1412          394601 :     state->memtupdeleted = nremove;
    1413                 : 
    1414                 :     /* mark tuplestore as truncated (used for Assert crosschecks only) */
    1415          394601 :     state->truncated = true;
    1416                 : 
    1417                 :     /*
    1418                 :      * If nremove is less than 1/8th memtupcount, just stop here, leaving the
    1419                 :      * "deleted" slots as NULL.  This prevents us from expending O(N^2) time
    1420                 :      * repeatedly memmove-ing a large pointer array.  The worst case space
    1421                 :      * wastage is pretty small, since it's just pointers and not whole tuples.
    1422                 :      */
    1423          394601 :     if (nremove < state->memtupcount / 8)
    1424           56340 :         return;
    1425                 : 
    1426                 :     /*
    1427                 :      * Slide the array down and readjust pointers.
    1428                 :      *
    1429                 :      * In mergejoin's current usage, it's demonstrable that there will always
    1430                 :      * be exactly one non-removed tuple; so optimize that case.
    1431                 :      */
    1432          338261 :     if (nremove + 1 == state->memtupcount)
    1433          300071 :         state->memtuples[0] = state->memtuples[nremove];
    1434                 :     else
    1435           38190 :         memmove(state->memtuples, state->memtuples + nremove,
    1436           38190 :                 (state->memtupcount - nremove) * sizeof(void *));
    1437                 : 
    1438          338261 :     state->memtupdeleted = 0;
    1439          338261 :     state->memtupcount -= nremove;
    1440         1474266 :     for (i = 0; i < state->readptrcount; i++)
    1441                 :     {
    1442         1136005 :         if (!state->readptrs[i].eof_reached)
    1443         1133999 :             state->readptrs[i].current -= nremove;
    1444                 :     }
    1445                 : }
    1446                 : 
    1447                 : /*
    1448                 :  * tuplestore_in_memory
    1449                 :  *
    1450                 :  * Returns true if the tuplestore has not spilled to disk.
    1451                 :  *
    1452                 :  * XXX exposing this is a violation of modularity ... should get rid of it.
    1453                 :  */
    1454                 : bool
    1455          759883 : tuplestore_in_memory(Tuplestorestate *state)
    1456                 : {
    1457          759883 :     return (state->status == TSS_INMEM);
    1458                 : }
    1459                 : 
    1460                 : 
    1461                 : /*
    1462                 :  * Tape interface routines
    1463                 :  */
    1464                 : 
    1465                 : static unsigned int
    1466         3685291 : getlen(Tuplestorestate *state, bool eofOK)
    1467                 : {
    1468                 :     unsigned int len;
    1469                 :     size_t      nbytes;
    1470                 : 
    1471 GNC     3685291 :     nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
    1472         3685291 :     if (nbytes == 0)
    1473              54 :         return 0;
    1474                 :     else
    1475 CBC     3685237 :         return len;
    1476                 : }
    1477                 : 
    1478                 : 
    1479                 : /*
    1480                 :  * Routines specialized for HeapTuple case
    1481                 :  *
    1482                 :  * The stored form is actually a MinimalTuple, but for largely historical
    1483                 :  * reasons we allow COPYTUP to work from a HeapTuple.
    1484                 :  *
    1485                 :  * Since MinimalTuple already has length in its first word, we don't need
    1486 ECB             :  * to write that separately.
    1487                 :  */
    1488                 : 
    1489                 : static void *
    1490 CBC      464617 : copytup_heap(Tuplestorestate *state, void *tup)
    1491 ECB             : {
    1492                 :     MinimalTuple tuple;
    1493                 : 
    1494 GIC      464617 :     tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
    1495          464617 :     USEMEM(state, GetMemoryChunkSpace(tuple));
    1496 CBC      464617 :     return (void *) tuple;
    1497                 : }
    1498 ECB             : 
    1499                 : static void
    1500 GIC     3943861 : writetup_heap(Tuplestorestate *state, void *tup)
    1501 ECB             : {
    1502 CBC     3943861 :     MinimalTuple tuple = (MinimalTuple) tup;
    1503                 : 
    1504                 :     /* the part of the MinimalTuple we'll write: */
    1505         3943861 :     char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    1506 GIC     3943861 :     unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
    1507 ECB             : 
    1508                 :     /* total on-disk footprint: */
    1509 CBC     3943861 :     unsigned int tuplen = tupbodylen + sizeof(int);
    1510 ECB             : 
    1511 GNC     3943861 :     BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
    1512         3943861 :     BufFileWrite(state->myfile, tupbody, tupbodylen);
    1513 CBC     3943861 :     if (state->backward)     /* need trailing length word? */
    1514 GNC       30000 :         BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
    1515                 : 
    1516 GIC     3943861 :     FREEMEM(state, GetMemoryChunkSpace(tuple));
    1517 CBC     3943861 :     heap_free_minimal_tuple(tuple);
    1518 GIC     3943861 : }
    1519 ECB             : 
    1520                 : static void *
    1521 CBC     3685234 : readtup_heap(Tuplestorestate *state, unsigned int len)
    1522 ECB             : {
    1523 GIC     3685234 :     unsigned int tupbodylen = len - sizeof(int);
    1524 CBC     3685234 :     unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
    1525 GIC     3685234 :     MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
    1526 CBC     3685234 :     char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    1527 ECB             : 
    1528 CBC     3685234 :     USEMEM(state, GetMemoryChunkSpace(tuple));
    1529 ECB             :     /* read in the tuple proper */
    1530 GIC     3685234 :     tuple->t_len = tuplen;
    1531 GNC     3685234 :     BufFileReadExact(state->myfile, tupbody, tupbodylen);
    1532 GIC     3685234 :     if (state->backward)     /* need trailing length word? */
    1533 GNC       30009 :         BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
    1534 GIC     3685234 :     return (void *) tuple;
    1535                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a