LCOV - differential code coverage report
Current view: top level - src/backend/utils/sort - logtape.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.5 % 348 322 1 9 12 4 4 188 11 119 16 192 2 2
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 26 26 23 3 24 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * logtape.c
       4                 :  *    Management of "logical tapes" within temporary files.
       5                 :  *
       6                 :  * This module exists to support sorting via multiple merge passes (see
       7                 :  * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
       8                 :  * we implement it on disk by creating a separate file for each "tape",
       9                 :  * there is an annoying problem: the peak space usage is at least twice
      10                 :  * the volume of actual data to be sorted.  (This must be so because each
      11                 :  * datum will appear in both the input and output tapes of the final
      12                 :  * merge pass.)
      13                 :  *
      14                 :  * We can work around this problem by recognizing that any one tape
      15                 :  * dataset (with the possible exception of the final output) is written
      16                 :  * and read exactly once in a perfectly sequential manner.  Therefore,
      17                 :  * a datum once read will not be required again, and we can recycle its
      18                 :  * space for use by the new tape dataset(s) being generated.  In this way,
      19                 :  * the total space usage is essentially just the actual data volume, plus
      20                 :  * insignificant bookkeeping and start/stop overhead.
      21                 :  *
      22                 :  * Few OSes allow arbitrary parts of a file to be released back to the OS,
      23                 :  * so we have to implement this space-recycling ourselves within a single
      24                 :  * logical file.  logtape.c exists to perform this bookkeeping and provide
      25                 :  * the illusion of N independent tape devices to tuplesort.c.  Note that
      26                 :  * logtape.c itself depends on buffile.c to provide a "logical file" of
      27                 :  * larger size than the underlying OS may support.
      28                 :  *
      29                 :  * For simplicity, we allocate and release space in the underlying file
      30                 :  * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
      31                 :  * of which blocks in the underlying file belong to which logical tape,
      32                 :  * plus any blocks that are free (recycled and not yet reused).
      33                 :  * The blocks in each logical tape form a chain, with a prev- and next-
      34                 :  * pointer in each block.
      35                 :  *
      36                 :  * The initial write pass is guaranteed to fill the underlying file
      37                 :  * perfectly sequentially, no matter how data is divided into logical tapes.
      38                 :  * Once we begin merge passes, the access pattern becomes considerably
      39                 :  * less predictable --- but the seeking involved should be comparable to
      40                 :  * what would happen if we kept each logical tape in a separate file,
      41                 :  * so there's no serious performance penalty paid to obtain the space
      42                 :  * savings of recycling.  We try to localize the write accesses by always
      43                 :  * writing to the lowest-numbered free block when we have a choice; it's
      44                 :  * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
      45                 :  * policy for free blocks would be better?)
      46                 :  *
      47                 :  * To further make the I/Os more sequential, we can use a larger buffer
      48                 :  * when reading, and read multiple blocks from the same tape in one go,
      49                 :  * whenever the buffer becomes empty.
      50                 :  *
      51                 :  * To support the above policy of writing to the lowest free block, the
      52                 :  * freelist is a min heap.
      53                 :  *
      54                 :  * Since all the bookkeeping and buffer memory is allocated with palloc(),
      55                 :  * and the underlying file(s) are made with OpenTemporaryFile, all resources
      56                 :  * for a logical tape set are certain to be cleaned up even if processing
      57                 :  * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
      58                 :  * care that all calls for a single LogicalTapeSet are made in the same
      59                 :  * palloc context.
      60                 :  *
      61                 :  * To support parallel sort operations involving coordinated callers to
      62                 :  * tuplesort.c routines across multiple workers, it is necessary to
      63                 :  * concatenate each worker BufFile/tapeset into one single logical tapeset
      64                 :  * managed by the leader.  Workers should have produced one final
      65                 :  * materialized tape (their entire output) when this happens in leader.
      66                 :  * There will always be the same number of runs as input tapes, and the same
      67                 :  * number of input tapes as participants (worker Tuplesortstates).
      68                 :  *
      69                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      70                 :  * Portions Copyright (c) 1994, Regents of the University of California
      71                 :  *
      72                 :  * IDENTIFICATION
      73                 :  *    src/backend/utils/sort/logtape.c
      74                 :  *
      75                 :  *-------------------------------------------------------------------------
      76                 :  */
      77                 : 
      78                 : #include "postgres.h"
      79                 : 
      80                 : #include <fcntl.h>
      81                 : 
      82                 : #include "storage/buffile.h"
      83                 : #include "utils/builtins.h"
      84                 : #include "utils/logtape.h"
      85                 : #include "utils/memdebug.h"
      86                 : #include "utils/memutils.h"
      87                 : 
      88                 : /*
      89                 :  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
      90                 :  *
      91                 :  * The first block of a tape has prev == -1.  The last block of a tape
      92                 :  * stores the number of valid bytes on the block, inverted, in 'next'
      93                 :  * Therefore next < 0 indicates the last block.
      94                 :  */
      95                 : typedef struct TapeBlockTrailer
      96                 : {
      97                 :     long        prev;           /* previous block on this tape, or -1 on first
      98                 :                                  * block */
      99                 :     long        next;           /* next block on this tape, or # of valid
     100                 :                                  * bytes on last block (if < 0) */
     101                 : } TapeBlockTrailer;
     102                 : 
     103                 : #define TapeBlockPayloadSize  (BLCKSZ - sizeof(TapeBlockTrailer))
     104                 : #define TapeBlockGetTrailer(buf) \
     105                 :     ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
     106                 : 
     107                 : #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
     108                 : #define TapeBlockGetNBytes(buf) \
     109                 :     (TapeBlockIsLast(buf) ? \
     110                 :      (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
     111                 : #define TapeBlockSetNBytes(buf, nbytes) \
     112                 :     (TapeBlockGetTrailer(buf)->next = -(nbytes))
     113                 : 
     114                 : /*
     115                 :  * When multiple tapes are being written to concurrently (as in HashAgg),
     116                 :  * avoid excessive fragmentation by preallocating block numbers to individual
     117                 :  * tapes. Each preallocation doubles in size starting at
     118                 :  * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
     119                 :  *
     120                 :  * No filesystem operations are performed for preallocation; only the block
     121                 :  * numbers are reserved. This may lead to sparse writes, which will cause
     122                 :  * ltsWriteBlock() to fill in holes with zeros.
     123                 :  */
     124                 : #define TAPE_WRITE_PREALLOC_MIN 8
     125                 : #define TAPE_WRITE_PREALLOC_MAX 128
     126                 : 
     127                 : /*
     128                 :  * This data structure represents a single "logical tape" within the set
     129                 :  * of logical tapes stored in the same file.
     130                 :  *
     131                 :  * While writing, we hold the current partially-written data block in the
     132                 :  * buffer.  While reading, we can hold multiple blocks in the buffer.  Note
     133                 :  * that we don't retain the trailers of a block when it's read into the
     134                 :  * buffer.  The buffer therefore contains one large contiguous chunk of data
     135                 :  * from the tape.
     136                 :  */
     137                 : struct LogicalTape
     138                 : {
     139                 :     LogicalTapeSet *tapeSet;    /* tape set this tape is part of */
     140                 : 
     141                 :     bool        writing;        /* T while in write phase */
     142                 :     bool        frozen;         /* T if blocks should not be freed when read */
     143                 :     bool        dirty;          /* does buffer need to be written? */
     144                 : 
     145                 :     /*
     146                 :      * Block numbers of the first, current, and next block of the tape.
     147                 :      *
     148                 :      * The "current" block number is only valid when writing, or reading from
     149                 :      * a frozen tape.  (When reading from an unfrozen tape, we use a larger
     150                 :      * read buffer that holds multiple blocks, so the "current" block is
     151                 :      * ambiguous.)
     152                 :      *
     153                 :      * When concatenation of worker tape BufFiles is performed, an offset to
     154                 :      * the first block in the unified BufFile space is applied during reads.
     155                 :      */
     156                 :     long        firstBlockNumber;
     157                 :     long        curBlockNumber;
     158                 :     long        nextBlockNumber;
     159                 :     long        offsetBlockNumber;
     160                 : 
     161                 :     /*
     162                 :      * Buffer for current data block(s).
     163                 :      */
     164                 :     char       *buffer;         /* physical buffer (separately palloc'd) */
     165                 :     int         buffer_size;    /* allocated size of the buffer */
     166                 :     int         max_size;       /* highest useful, safe buffer_size */
     167                 :     int         pos;            /* next read/write position in buffer */
     168                 :     int         nbytes;         /* total # of valid bytes in buffer */
     169                 : 
     170                 :     /*
     171                 :      * Preallocated block numbers are held in an array sorted in descending
     172                 :      * order; blocks are consumed from the end of the array (lowest block
     173                 :      * numbers first).
     174                 :      */
     175                 :     long       *prealloc;
     176                 :     int         nprealloc;      /* number of elements in list */
     177                 :     int         prealloc_size;  /* number of elements list can hold */
     178                 : };
     179                 : 
     180                 : /*
     181                 :  * This data structure represents a set of related "logical tapes" sharing
     182                 :  * space in a single underlying file.  (But that "file" may be multiple files
     183                 :  * if needed to escape OS limits on file size; buffile.c handles that for us.)
     184                 :  * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
     185                 :  * demand.
     186                 :  */
     187                 : struct LogicalTapeSet
     188                 : {
     189                 :     BufFile    *pfile;          /* underlying file for whole tape set */
     190                 :     SharedFileSet *fileset;
     191                 :     int         worker;         /* worker # if shared, -1 for leader/serial */
     192                 : 
     193                 :     /*
     194                 :      * File size tracking.  nBlocksWritten is the size of the underlying file,
     195                 :      * in BLCKSZ blocks.  nBlocksAllocated is the number of blocks allocated
     196                 :      * by ltsReleaseBlock(), and it is always greater than or equal to
     197                 :      * nBlocksWritten.  Blocks between nBlocksAllocated and nBlocksWritten are
     198                 :      * blocks that have been allocated for a tape, but have not been written
     199                 :      * to the underlying file yet.  nHoleBlocks tracks the total number of
     200                 :      * blocks that are in unused holes between worker spaces following BufFile
     201                 :      * concatenation.
     202                 :      */
     203                 :     long        nBlocksAllocated;   /* # of blocks allocated */
     204                 :     long        nBlocksWritten; /* # of blocks used in underlying file */
     205                 :     long        nHoleBlocks;    /* # of "hole" blocks left */
     206                 : 
     207                 :     /*
     208                 :      * We store the numbers of recycled-and-available blocks in freeBlocks[].
     209                 :      * When there are no such blocks, we extend the underlying file.
     210                 :      *
     211                 :      * If forgetFreeSpace is true then any freed blocks are simply forgotten
     212                 :      * rather than being remembered in freeBlocks[].  See notes for
     213                 :      * LogicalTapeSetForgetFreeSpace().
     214                 :      */
     215                 :     bool        forgetFreeSpace;    /* are we remembering free blocks? */
     216                 :     long       *freeBlocks;     /* resizable array holding minheap */
     217                 :     long        nFreeBlocks;    /* # of currently free blocks */
     218                 :     Size        freeBlocksLen;  /* current allocated length of freeBlocks[] */
     219                 :     bool        enable_prealloc;    /* preallocate write blocks? */
     220                 : };
     221                 : 
     222                 : static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
     223                 : static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, const void *buffer);
     224                 : static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
     225                 : static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
     226                 : static long ltsGetFreeBlock(LogicalTapeSet *lts);
     227                 : static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
     228                 : static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
     229                 : static void ltsInitReadBuffer(LogicalTape *lt);
     230                 : 
     231                 : 
     232                 : /*
     233                 :  * Write a block-sized buffer to the specified block of the underlying file.
     234                 :  *
     235                 :  * No need for an error return convention; we ereport() on any error.
     236                 :  */
     237                 : static void
     238 GNC       28343 : ltsWriteBlock(LogicalTapeSet *lts, long blocknum, const void *buffer)
     239                 : {
     240                 :     /*
     241                 :      * BufFile does not support "holes", so if we're about to write a block
     242                 :      * that's past the current end of file, fill the space between the current
     243                 :      * end of file and the target block with zeros.
     244                 :      *
     245                 :      * This can happen either when tapes preallocate blocks; or for the last
     246                 :      * block of a tape which might not have been flushed.
     247                 :      *
     248                 :      * Note that BufFile concatenation can leave "holes" in BufFile between
     249                 :      * worker-owned block ranges.  These are tracked for reporting purposes
     250                 :      * only.  We never read from nor write to these hole blocks, and so they
     251                 :      * are not considered here.
     252                 :      */
     253 CBC       30569 :     while (blocknum > lts->nBlocksWritten)
     254                 :     {
     255                 :         PGIOAlignedBlock zerobuf;
     256                 : 
     257            2226 :         MemSet(zerobuf.data, 0, sizeof(zerobuf));
     258                 : 
     259            2226 :         ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
     260                 :     }
     261                 : 
     262                 :     /* Write the requested block */
     263           28343 :     if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
     264 UBC           0 :         ereport(ERROR,
     265                 :                 (errcode_for_file_access(),
     266                 :                  errmsg("could not seek to block %ld of temporary file",
     267                 :                         blocknum)));
     268 CBC       28343 :     BufFileWrite(lts->pfile, buffer, BLCKSZ);
     269                 : 
     270                 :     /* Update nBlocksWritten, if we extended the file */
     271           28343 :     if (blocknum == lts->nBlocksWritten)
     272            9552 :         lts->nBlocksWritten++;
     273           28343 : }
     274                 : 
     275                 : /*
     276                 :  * Read a block-sized buffer from the specified block of the underlying file.
     277                 :  *
     278                 :  * No need for an error return convention; we ereport() on any error.   This
     279                 :  * module should never attempt to read a block it doesn't know is there.
     280                 :  */
     281                 : static void
     282           25938 : ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
     283                 : {
     284 GIC       25938 :     if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
     285 UIC           0 :         ereport(ERROR,
     286                 :                 (errcode_for_file_access(),
     287 ECB             :                  errmsg("could not seek to block %ld of temporary file",
     288                 :                         blocknum)));
     289 GNC       25938 :     BufFileReadExact(lts->pfile, buffer, BLCKSZ);
     290 GIC       25938 : }
     291 ECB             : 
     292                 : /*
     293                 :  * Read as many blocks as we can into the per-tape buffer.
     294                 :  *
     295                 :  * Returns true if anything was read, 'false' on EOF.
     296                 :  */
     297                 : static bool
     298 CBC       31836 : ltsReadFillBuffer(LogicalTape *lt)
     299 ECB             : {
     300 GIC       31836 :     lt->pos = 0;
     301           31836 :     lt->nbytes = 0;
     302 ECB             : 
     303                 :     do
     304                 :     {
     305 CBC       39190 :         char       *thisbuf = lt->buffer + lt->nbytes;
     306 GIC       39190 :         long        datablocknum = lt->nextBlockNumber;
     307                 : 
     308 ECB             :         /* Fetch next block number */
     309 CBC       39190 :         if (datablocknum == -1L)
     310           13506 :             break;              /* EOF */
     311 ECB             :         /* Apply worker offset, needed for leader tapesets */
     312 GIC       25684 :         datablocknum += lt->offsetBlockNumber;
     313 ECB             : 
     314                 :         /* Read the block */
     315 GNC       25684 :         ltsReadBlock(lt->tapeSet, datablocknum, thisbuf);
     316 CBC       25684 :         if (!lt->frozen)
     317 GIC       25333 :             ltsReleaseBlock(lt->tapeSet, datablocknum);
     318 CBC       25684 :         lt->curBlockNumber = lt->nextBlockNumber;
     319                 : 
     320 GIC       25684 :         lt->nbytes += TapeBlockGetNBytes(thisbuf);
     321 CBC       25684 :         if (TapeBlockIsLast(thisbuf))
     322                 :         {
     323 GIC       14106 :             lt->nextBlockNumber = -1L;
     324 ECB             :             /* EOF */
     325 GIC       14106 :             break;
     326 ECB             :         }
     327                 :         else
     328 GIC       11578 :             lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
     329                 : 
     330 ECB             :         /* Advance to next block, if we have buffer space left */
     331 GIC       11578 :     } while (lt->buffer_size - lt->nbytes > BLCKSZ);
     332 ECB             : 
     333 GIC       31836 :     return (lt->nbytes > 0);
     334                 : }
     335                 : 
     336 ECB             : static inline unsigned long
     337 GIC      773128 : left_offset(unsigned long i)
     338 ECB             : {
     339 GIC      773128 :     return 2 * i + 1;
     340                 : }
     341                 : 
     342 ECB             : static inline unsigned long
     343 GIC      773128 : right_offset(unsigned long i)
     344 ECB             : {
     345 GIC      773128 :     return 2 * i + 2;
     346                 : }
     347                 : 
     348                 : static inline unsigned long
     349          479780 : parent_offset(unsigned long i)
     350                 : {
     351 CBC      479780 :     return (i - 1) / 2;
     352                 : }
     353 ECB             : 
     354                 : /*
     355                 :  * Get the next block for writing.
     356                 :  */
     357                 : static long
     358 GIC       26117 : ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
     359                 : {
     360           26117 :     if (lts->enable_prealloc)
     361           14037 :         return ltsGetPreallocBlock(lts, lt);
     362                 :     else
     363           12080 :         return ltsGetFreeBlock(lts);
     364 ECB             : }
     365                 : 
     366                 : /*
     367                 :  * Select the lowest currently unused block from the tape set's global free
     368                 :  * list min heap.
     369                 :  */
     370                 : static long
     371 GIC      120320 : ltsGetFreeBlock(LogicalTapeSet *lts)
     372                 : {
     373 CBC      120320 :     long       *heap = lts->freeBlocks;
     374 ECB             :     long        blocknum;
     375                 :     int         heapsize;
     376                 :     long        holeval;
     377                 :     unsigned long holepos;
     378                 : 
     379                 :     /* freelist empty; allocate a new block */
     380 CBC      120320 :     if (lts->nFreeBlocks == 0)
     381 GIC        9780 :         return lts->nBlocksAllocated++;
     382                 : 
     383                 :     /* easy if heap contains one element */
     384 CBC      110540 :     if (lts->nFreeBlocks == 1)
     385                 :     {
     386 GIC         480 :         lts->nFreeBlocks--;
     387 CBC         480 :         return lts->freeBlocks[0];
     388                 :     }
     389                 : 
     390 ECB             :     /* remove top of minheap */
     391 CBC      110060 :     blocknum = heap[0];
     392                 : 
     393 ECB             :     /* we'll replace it with end of minheap array */
     394 CBC      110060 :     holeval = heap[--lts->nFreeBlocks];
     395 ECB             : 
     396                 :     /* sift down */
     397 GIC      110060 :     holepos = 0;                /* holepos is where the "hole" is */
     398 CBC      110060 :     heapsize = lts->nFreeBlocks;
     399 ECB             :     while (true)
     400 CBC      663068 :     {
     401          773128 :         unsigned long left = left_offset(holepos);
     402          773128 :         unsigned long right = right_offset(holepos);
     403 EUB             :         unsigned long min_child;
     404                 : 
     405 CBC      773128 :         if (left < heapsize && right < heapsize)
     406 GIC      668982 :             min_child = (heap[left] < heap[right]) ? left : right;
     407 CBC      104146 :         else if (left < heapsize)
     408           22255 :             min_child = left;
     409 GIC       81891 :         else if (right < heapsize)
     410 LBC           0 :             min_child = right;
     411 ECB             :         else
     412 GIC       81891 :             break;
     413 ECB             : 
     414 GIC      691237 :         if (heap[min_child] >= holeval)
     415 CBC       28169 :             break;
     416                 : 
     417 GIC      663068 :         heap[holepos] = heap[min_child];
     418          663068 :         holepos = min_child;
     419                 :     }
     420          110060 :     heap[holepos] = holeval;
     421                 : 
     422          110060 :     return blocknum;
     423                 : }
     424 ECB             : 
     425                 : /*
     426                 :  * Return the lowest free block number from the tape's preallocation list.
     427                 :  * Refill the preallocation list with blocks from the tape set's free list if
     428                 :  * necessary.
     429                 :  */
     430                 : static long
     431 GIC       14037 : ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
     432 ECB             : {
     433                 :     /* sorted in descending order, so return the last element */
     434 GIC       14037 :     if (lt->nprealloc > 0)
     435 CBC         519 :         return lt->prealloc[--lt->nprealloc];
     436                 : 
     437 GIC       13518 :     if (lt->prealloc == NULL)
     438 ECB             :     {
     439 CBC       13506 :         lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
     440 GBC       13506 :         lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
     441 ECB             :     }
     442 CBC          12 :     else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
     443                 :     {
     444                 :         /* when the preallocation list runs out, double the size */
     445 GIC          12 :         lt->prealloc_size *= 2;
     446 CBC          12 :         if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
     447 LBC           0 :             lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
     448 GIC          12 :         lt->prealloc = (long *) repalloc(lt->prealloc,
     449 CBC          12 :                                          sizeof(long) * lt->prealloc_size);
     450                 :     }
     451                 : 
     452 ECB             :     /* refill preallocation list */
     453 GIC       13518 :     lt->nprealloc = lt->prealloc_size;
     454          121758 :     for (int i = lt->nprealloc; i > 0; i--)
     455 ECB             :     {
     456 GIC      108240 :         lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
     457                 : 
     458                 :         /* verify descending order */
     459          108240 :         Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
     460                 :     }
     461                 : 
     462 CBC       13518 :     return lt->prealloc[--lt->nprealloc];
     463                 : }
     464                 : 
     465                 : /*
     466                 :  * Return a block# to the freelist.
     467                 :  */
     468                 : static void
     469 GIC      119536 : ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
     470 ECB             : {
     471                 :     long       *heap;
     472                 :     unsigned long holepos;
     473                 : 
     474                 :     /*
     475                 :      * Do nothing if we're no longer interested in remembering free space.
     476                 :      */
     477 GIC      119536 :     if (lts->forgetFreeSpace)
     478            6784 :         return;
     479                 : 
     480                 :     /*
     481                 :      * Enlarge freeBlocks array if full.
     482 ECB             :      */
     483 GBC      112752 :     if (lts->nFreeBlocks >= lts->freeBlocksLen)
     484                 :     {
     485 ECB             :         /*
     486                 :          * If the freelist becomes very large, just return and leak this free
     487                 :          * block.
     488                 :          */
     489 GIC          36 :         if (lts->freeBlocksLen * 2 * sizeof(long) > MaxAllocSize)
     490 UIC           0 :             return;
     491 ECB             : 
     492 CBC          36 :         lts->freeBlocksLen *= 2;
     493              36 :         lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
     494 GIC          36 :                                             lts->freeBlocksLen * sizeof(long));
     495                 :     }
     496 ECB             : 
     497                 :     /* create a "hole" at end of minheap array */
     498 CBC      112752 :     heap = lts->freeBlocks;
     499 GIC      112752 :     holepos = lts->nFreeBlocks;
     500 CBC      112752 :     lts->nFreeBlocks++;
     501 ECB             : 
     502                 :     /* sift up to insert blocknum */
     503 CBC      497598 :     while (holepos != 0)
     504 ECB             :     {
     505 GIC      479780 :         unsigned long parent = parent_offset(holepos);
     506 ECB             : 
     507 GIC      479780 :         if (heap[parent] < blocknum)
     508           94934 :             break;
     509                 : 
     510          384846 :         heap[holepos] = heap[parent];
     511          384846 :         holepos = parent;
     512                 :     }
     513          112752 :     heap[holepos] = blocknum;
     514                 : }
     515 ECB             : 
     516                 : /*
     517                 :  * Lazily allocate and initialize the read buffer. This avoids waste when many
     518                 :  * tapes are open at once, but not all are active between rewinding and
     519                 :  * reading.
     520                 :  */
     521                 : static void
     522 CBC       14109 : ltsInitReadBuffer(LogicalTape *lt)
     523 ECB             : {
     524 CBC       14109 :     Assert(lt->buffer_size > 0);
     525           14109 :     lt->buffer = palloc(lt->buffer_size);
     526                 : 
     527                 :     /* Read the first block, or reset if tape is empty */
     528 GIC       14109 :     lt->nextBlockNumber = lt->firstBlockNumber;
     529           14109 :     lt->pos = 0;
     530           14109 :     lt->nbytes = 0;
     531           14109 :     ltsReadFillBuffer(lt);
     532           14109 : }
     533                 : 
     534                 : /*
     535                 :  * Create a tape set, backed by a temporary underlying file.
     536                 :  *
     537                 :  * The tape set is initially empty. Use LogicalTapeCreate() to create
     538                 :  * tapes in it.
     539                 :  *
     540                 :  * In a single-process sort, pass NULL argument for fileset, and -1 for
     541                 :  * worker.
     542                 :  *
     543                 :  * In a parallel sort, parallel workers pass the shared fileset handle and
     544                 :  * their own worker number.  After the workers have finished, create the
     545                 :  * tape set in the leader, passing the shared fileset handle and -1 for
     546                 :  * worker, and use LogicalTapeImport() to import the worker tapes into it.
     547                 :  *
     548                 :  * Currently, the leader will only import worker tapes into the set, it does
     549 ECB             :  * not create tapes of its own, although in principle that should work.
     550                 :  *
     551                 :  * If preallocate is true, blocks for each individual tape are allocated in
     552                 :  * batches.  This avoids fragmentation when writing multiple tapes at the
     553                 :  * same time.
     554                 :  */
     555                 : LogicalTapeSet *
     556 CBC         367 : LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
     557 ECB             : {
     558                 :     LogicalTapeSet *lts;
     559                 : 
     560                 :     /*
     561                 :      * Create top-level struct including per-tape LogicalTape structs.
     562                 :      */
     563 CBC         367 :     lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
     564             367 :     lts->nBlocksAllocated = 0L;
     565 GIC         367 :     lts->nBlocksWritten = 0L;
     566 CBC         367 :     lts->nHoleBlocks = 0L;
     567             367 :     lts->forgetFreeSpace = false;
     568 GIC         367 :     lts->freeBlocksLen = 32; /* reasonable initial guess */
     569             367 :     lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
     570             367 :     lts->nFreeBlocks = 0;
     571             367 :     lts->enable_prealloc = preallocate;
     572                 : 
     573             367 :     lts->fileset = fileset;
     574             367 :     lts->worker = worker;
     575                 : 
     576                 :     /*
     577                 :      * Create temp BufFile storage as required.
     578 ECB             :      *
     579                 :      * In leader, we hijack the BufFile of the first tape that's imported, and
     580                 :      * concatenate the BufFiles of any subsequent tapes to that. Hence don't
     581                 :      * create a BufFile here. Things are simpler for the worker case and the
     582                 :      * serial case, though.  They are generally very similar -- workers use a
     583                 :      * shared fileset, whereas serial sorts use a conventional serial BufFile.
     584                 :      */
     585 CBC         367 :     if (fileset && worker == -1)
     586 GIC          71 :         lts->pfile = NULL;
     587             296 :     else if (fileset)
     588 ECB             :     {
     589                 :         char        filename[MAXPGPATH];
     590                 : 
     591 GIC         206 :         pg_itoa(worker, filename);
     592             206 :         lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
     593                 :     }
     594                 :     else
     595              90 :         lts->pfile = BufFileCreateTemp(false);
     596                 : 
     597             367 :     return lts;
     598                 : }
     599                 : 
     600                 : /*
     601                 :  * Claim ownership of a logical tape from an existing shared BufFile.
     602 ECB             :  *
     603                 :  * Caller should be leader process.  Though tapes are marked as frozen in
     604                 :  * workers, they are not frozen when opened within leader, since unfrozen tapes
     605                 :  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
     606                 :  * for random access.)
     607                 :  */
     608                 : LogicalTape *
     609 GIC         142 : LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
     610 ECB             : {
     611                 :     LogicalTape *lt;
     612                 :     long        tapeblocks;
     613                 :     char        filename[MAXPGPATH];
     614                 :     BufFile    *file;
     615                 :     int64       filesize;
     616                 : 
     617 CBC         142 :     lt = ltsCreateTape(lts);
     618 ECB             : 
     619                 :     /*
     620                 :      * build concatenated view of all buffiles, remembering the block number
     621                 :      * where each source file begins.
     622                 :      */
     623 GIC         142 :     pg_itoa(worker, filename);
     624 CBC         142 :     file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
     625             142 :     filesize = BufFileSize(file);
     626                 : 
     627 ECB             :     /*
     628                 :      * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
     629                 :      * block offset into each tape as we go.
     630                 :      */
     631 GIC         142 :     lt->firstBlockNumber = shared->firstblocknumber;
     632 CBC         142 :     if (lts->pfile == NULL)
     633                 :     {
     634 GIC          71 :         lts->pfile = file;
     635 CBC          71 :         lt->offsetBlockNumber = 0L;
     636 ECB             :     }
     637                 :     else
     638                 :     {
     639 GIC          71 :         lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
     640                 :     }
     641                 :     /* Don't allocate more for read buffer than could possibly help */
     642             142 :     lt->max_size = Min(MaxAllocSize, filesize);
     643             142 :     tapeblocks = filesize / BLCKSZ;
     644                 : 
     645 ECB             :     /*
     646                 :      * Update # of allocated blocks and # blocks written to reflect the
     647                 :      * imported BufFile.  Allocated/written blocks include space used by holes
     648                 :      * left between concatenated BufFiles.  Also track the number of hole
     649                 :      * blocks so that we can later work backwards to calculate the number of
     650                 :      * physical blocks for instrumentation.
     651                 :      */
     652 GIC         142 :     lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
     653                 : 
     654             142 :     lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
     655             142 :     lts->nBlocksWritten = lts->nBlocksAllocated;
     656                 : 
     657             142 :     return lt;
     658                 : }
     659                 : 
     660 ECB             : /*
     661                 :  * Close a logical tape set and release all resources.
     662                 :  *
     663                 :  * NOTE: This doesn't close any of the tapes!  You must close them
     664                 :  * first, or you can let them be destroyed along with the memory context.
     665                 :  */
     666                 : void
     667 GIC         367 : LogicalTapeSetClose(LogicalTapeSet *lts)
     668                 : {
     669             367 :     BufFileClose(lts->pfile);
     670             367 :     pfree(lts->freeBlocks);
     671             367 :     pfree(lts);
     672             367 : }
     673 ECB             : 
     674                 : /*
     675                 :  * Create a logical tape in the given tapeset.
     676                 :  *
     677                 :  * The tape is initialized in write state.
     678                 :  */
     679                 : LogicalTape *
     680 GIC       25945 : LogicalTapeCreate(LogicalTapeSet *lts)
     681                 : {
     682 ECB             :     /*
     683 EUB             :      * The only thing that currently prevents creating new tapes in leader is
     684                 :      * the fact that BufFiles opened using BufFileOpenShared() are read-only
     685 ECB             :      * by definition, but that could be changed if it seemed worthwhile.  For
     686                 :      * now, writing to the leader tape will raise a "Bad file descriptor"
     687                 :      * error, so tuplesort must avoid writing to the leader tape altogether.
     688                 :      */
     689 CBC       25945 :     if (lts->fileset && lts->worker == -1)
     690 UIC           0 :         elog(ERROR, "cannot create new tapes in leader process");
     691                 : 
     692 GIC       25945 :     return ltsCreateTape(lts);
     693                 : }
     694                 : 
     695                 : static LogicalTape *
     696 CBC       26087 : ltsCreateTape(LogicalTapeSet *lts)
     697 ECB             : {
     698                 :     LogicalTape *lt;
     699                 : 
     700                 :     /*
     701                 :      * Create per-tape struct.  Note we allocate the I/O buffer lazily.
     702                 :      */
     703 CBC       26087 :     lt = palloc(sizeof(LogicalTape));
     704           26087 :     lt->tapeSet = lts;
     705           26087 :     lt->writing = true;
     706           26087 :     lt->frozen = false;
     707 GIC       26087 :     lt->dirty = false;
     708 CBC       26087 :     lt->firstBlockNumber = -1L;
     709           26087 :     lt->curBlockNumber = -1L;
     710           26087 :     lt->nextBlockNumber = -1L;
     711           26087 :     lt->offsetBlockNumber = 0L;
     712           26087 :     lt->buffer = NULL;
     713           26087 :     lt->buffer_size = 0;
     714                 :     /* palloc() larger than MaxAllocSize would fail */
     715           26087 :     lt->max_size = MaxAllocSize;
     716 GIC       26087 :     lt->pos = 0;
     717           26087 :     lt->nbytes = 0;
     718           26087 :     lt->prealloc = NULL;
     719           26087 :     lt->nprealloc = 0;
     720           26087 :     lt->prealloc_size = 0;
     721                 : 
     722           26087 :     return lt;
     723                 : }
     724                 : 
     725                 : /*
     726 ECB             :  * Close a logical tape.
     727                 :  *
     728                 :  * Note: This doesn't return any blocks to the free list!  You must read
     729                 :  * the tape to the end first, to reuse the space.  In current use, though,
     730                 :  * we only close tapes after fully reading them.
     731                 :  */
     732                 : void
     733 GIC       13980 : LogicalTapeClose(LogicalTape *lt)
     734                 : {
     735           13980 :     if (lt->buffer)
     736           13980 :         pfree(lt->buffer);
     737           13980 :     pfree(lt);
     738           13980 : }
     739                 : 
     740                 : /*
     741                 :  * Mark a logical tape set as not needing management of free space anymore.
     742                 :  *
     743 ECB             :  * This should be called if the caller does not intend to write any more data
     744                 :  * into the tape set, but is reading from un-frozen tapes.  Since no more
     745                 :  * writes are planned, remembering free blocks is no longer useful.  Setting
     746                 :  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
     747                 :  * is not designed to handle large numbers of free blocks.
     748                 :  */
     749                 : void
     750 GIC         119 : LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
     751                 : {
     752             119 :     lts->forgetFreeSpace = true;
     753             119 : }
     754 ECB             : 
     755                 : /*
     756                 :  * Write to a logical tape.
     757                 :  *
     758                 :  * There are no error returns; we ereport() on failure.
     759                 :  */
     760                 : void
     761 GNC     9297487 : LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
     762                 : {
     763 CBC     9297487 :     LogicalTapeSet *lts = lt->tapeSet;
     764                 :     size_t      nthistime;
     765 ECB             : 
     766 CBC     9297487 :     Assert(lt->writing);
     767 GIC     9297487 :     Assert(lt->offsetBlockNumber == 0L);
     768 ECB             : 
     769                 :     /* Allocate data buffer and first block on first write */
     770 CBC     9297487 :     if (lt->buffer == NULL)
     771 ECB             :     {
     772 GIC       14179 :         lt->buffer = (char *) palloc(BLCKSZ);
     773 CBC       14179 :         lt->buffer_size = BLCKSZ;
     774 ECB             :     }
     775 GIC     9297487 :     if (lt->curBlockNumber == -1)
     776 ECB             :     {
     777 GIC       14179 :         Assert(lt->firstBlockNumber == -1);
     778           14179 :         Assert(lt->pos == 0);
     779 ECB             : 
     780 CBC       14179 :         lt->curBlockNumber = ltsGetBlock(lts, lt);
     781 GIC       14179 :         lt->firstBlockNumber = lt->curBlockNumber;
     782 ECB             : 
     783 GIC       14179 :         TapeBlockGetTrailer(lt->buffer)->prev = -1L;
     784                 :     }
     785                 : 
     786         9297487 :     Assert(lt->buffer_size == BLCKSZ);
     787 CBC    18602804 :     while (size > 0)
     788                 :     {
     789 GIC     9305317 :         if (lt->pos >= (int) TapeBlockPayloadSize)
     790 EUB             :         {
     791                 :             /* Buffer full, dump it out */
     792                 :             long        nextBlockNumber;
     793                 : 
     794 GIC       11938 :             if (!lt->dirty)
     795                 :             {
     796                 :                 /* Hmm, went directly from reading to writing? */
     797 LBC           0 :                 elog(ERROR, "invalid logtape state: should be dirty");
     798                 :             }
     799                 : 
     800 ECB             :             /*
     801                 :              * First allocate the next block, so that we can store it in the
     802                 :              * 'next' pointer of this block.
     803                 :              */
     804 CBC       11938 :             nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
     805 ECB             : 
     806                 :             /* set the next-pointer and dump the current block. */
     807 CBC       11938 :             TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
     808 GNC       11938 :             ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
     809                 : 
     810 ECB             :             /* initialize the prev-pointer of the next block */
     811 CBC       11938 :             TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
     812           11938 :             lt->curBlockNumber = nextBlockNumber;
     813           11938 :             lt->pos = 0;
     814 GIC       11938 :             lt->nbytes = 0;
     815 ECB             :         }
     816                 : 
     817 CBC     9305317 :         nthistime = TapeBlockPayloadSize - lt->pos;
     818         9305317 :         if (nthistime > size)
     819         9293376 :             nthistime = size;
     820         9305317 :         Assert(nthistime > 0);
     821 ECB             : 
     822 CBC     9305317 :         memcpy(lt->buffer + lt->pos, ptr, nthistime);
     823                 : 
     824         9305317 :         lt->dirty = true;
     825 GIC     9305317 :         lt->pos += nthistime;
     826         9305317 :         if (lt->nbytes < lt->pos)
     827         9305317 :             lt->nbytes = lt->pos;
     828 GNC     9305317 :         ptr = (const char *) ptr + nthistime;
     829 GIC     9305317 :         size -= nthistime;
     830                 :     }
     831         9297487 : }
     832                 : 
     833                 : /*
     834                 :  * Rewind logical tape and switch from writing to reading.
     835                 :  *
     836                 :  * The tape must currently be in writing state, or "frozen" in read state.
     837                 :  *
     838                 :  * 'buffer_size' specifies how much memory to use for the read buffer.
     839 ECB             :  * Regardless of the argument, the actual amount of memory used is between
     840                 :  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ.  The given value is
     841                 :  * rounded down and truncated to fit those constraints, if necessary.  If the
     842                 :  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
     843                 :  * byte buffer is used.
     844                 :  */
     845                 : void
     846 CBC       14109 : LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
     847 ECB             : {
     848 GIC       14109 :     LogicalTapeSet *lts = lt->tapeSet;
     849                 : 
     850                 :     /*
     851 ECB             :      * Round and cap buffer_size if needed.
     852                 :      */
     853 GIC       14109 :     if (lt->frozen)
     854               3 :         buffer_size = BLCKSZ;
     855 ECB             :     else
     856                 :     {
     857                 :         /* need at least one block */
     858 GIC       14106 :         if (buffer_size < BLCKSZ)
     859 CBC         288 :             buffer_size = BLCKSZ;
     860                 : 
     861                 :         /* palloc() larger than max_size is unlikely to be helpful */
     862           14106 :         if (buffer_size > lt->max_size)
     863 GIC         142 :             buffer_size = lt->max_size;
     864                 : 
     865                 :         /* round down to BLCKSZ boundary */
     866           14106 :         buffer_size -= buffer_size % BLCKSZ;
     867                 :     }
     868 ECB             : 
     869 GIC       14109 :     if (lt->writing)
     870                 :     {
     871                 :         /*
     872                 :          * Completion of a write phase.  Flush last partial data block, and
     873                 :          * rewind for normal (destructive) read.
     874                 :          */
     875           14106 :         if (lt->dirty)
     876                 :         {
     877                 :             /*
     878                 :              * As long as we've filled the buffer at least once, its contents
     879                 :              * are entirely defined from valgrind's point of view, even though
     880                 :              * contents beyond the current end point may be stale.  But it's
     881                 :              * possible - at least in the case of a parallel sort - to sort
     882 ECB             :              * such small amount of data that we do not fill the buffer even
     883                 :              * once.  Tell valgrind that its contents are defined, so it
     884                 :              * doesn't bleat.
     885                 :              */
     886                 :             VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
     887                 :                                       lt->buffer_size - lt->nbytes);
     888                 : 
     889 GIC       13964 :             TapeBlockSetNBytes(lt->buffer, lt->nbytes);
     890 GNC       13964 :             ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
     891                 :         }
     892 GIC       14106 :         lt->writing = false;
     893 ECB             :     }
     894                 :     else
     895                 :     {
     896                 :         /*
     897                 :          * This is only OK if tape is frozen; we rewind for (another) read
     898                 :          * pass.
     899                 :          */
     900 CBC           3 :         Assert(lt->frozen);
     901 ECB             :     }
     902                 : 
     903 GIC       14109 :     if (lt->buffer)
     904 CBC       13967 :         pfree(lt->buffer);
     905                 : 
     906 ECB             :     /* the buffer is lazily allocated, but set the size here */
     907 CBC       14109 :     lt->buffer = NULL;
     908           14109 :     lt->buffer_size = buffer_size;
     909 ECB             : 
     910                 :     /* free the preallocation list, and return unused block numbers */
     911 CBC       14109 :     if (lt->prealloc != NULL)
     912                 :     {
     913          107709 :         for (int i = lt->nprealloc; i > 0; i--)
     914 GIC       94203 :             ltsReleaseBlock(lts, lt->prealloc[i - 1]);
     915           13506 :         pfree(lt->prealloc);
     916           13506 :         lt->prealloc = NULL;
     917           13506 :         lt->nprealloc = 0;
     918           13506 :         lt->prealloc_size = 0;
     919                 :     }
     920           14109 : }
     921 ECB             : 
     922                 : /*
     923                 :  * Read from a logical tape.
     924                 :  *
     925                 :  * Early EOF is indicated by return value less than #bytes requested.
     926                 :  */
     927                 : size_t
     928 CBC     9435087 : LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
     929 ECB             : {
     930 GIC     9435087 :     size_t      nread = 0;
     931 ECB             :     size_t      nthistime;
     932                 : 
     933 CBC     9435087 :     Assert(!lt->writing);
     934                 : 
     935 GIC     9435087 :     if (lt->buffer == NULL)
     936 CBC       14109 :         ltsInitReadBuffer(lt);
     937 ECB             : 
     938 GIC    18859374 :     while (size > 0)
     939                 :     {
     940 CBC     9437793 :         if (lt->pos >= lt->nbytes)
     941 ECB             :         {
     942                 :             /* Try to load more data into buffer. */
     943 CBC       17727 :             if (!ltsReadFillBuffer(lt))
     944 GIC       13506 :                 break;          /* EOF */
     945 ECB             :         }
     946                 : 
     947 CBC     9424287 :         nthistime = lt->nbytes - lt->pos;
     948         9424287 :         if (nthistime > size)
     949         9405963 :             nthistime = size;
     950         9424287 :         Assert(nthistime > 0);
     951                 : 
     952 GIC     9424287 :         memcpy(ptr, lt->buffer + lt->pos, nthistime);
     953 ECB             : 
     954 GIC     9424287 :         lt->pos += nthistime;
     955 GNC     9424287 :         ptr = (char *) ptr + nthistime;
     956 GIC     9424287 :         size -= nthistime;
     957         9424287 :         nread += nthistime;
     958                 :     }
     959                 : 
     960         9435087 :     return nread;
     961                 : }
     962                 : 
     963                 : /*
     964                 :  * "Freeze" the contents of a tape so that it can be read multiple times
     965                 :  * and/or read backwards.  Once a tape is frozen, its contents will not
     966                 :  * be released until the LogicalTapeSet is destroyed.  This is expected
     967                 :  * to be used only for the final output pass of a merge.
     968                 :  *
     969                 :  * This *must* be called just at the end of a write pass, before the
     970                 :  * tape is rewound (after rewind is too late!).  It performs a rewind
     971                 :  * and switch to read mode "for free".  An immediately following rewind-
     972                 :  * for-read call is OK but not necessary.
     973                 :  *
     974 ECB             :  * share output argument is set with details of storage used for tape after
     975                 :  * freezing, which may be passed to LogicalTapeSetCreate within leader
     976                 :  * process later.  This metadata is only of interest to worker callers
     977                 :  * freezing their final output for leader (single materialized tape).
     978                 :  * Serial sorts should set share to NULL.
     979                 :  */
     980                 : void
     981 GIC         215 : LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
     982                 : {
     983             215 :     LogicalTapeSet *lts = lt->tapeSet;
     984                 : 
     985 CBC         215 :     Assert(lt->writing);
     986 GIC         215 :     Assert(lt->offsetBlockNumber == 0L);
     987                 : 
     988                 :     /*
     989                 :      * Completion of a write phase.  Flush last partial data block, and rewind
     990                 :      * for nondestructive read.
     991                 :      */
     992             215 :     if (lt->dirty)
     993                 :     {
     994                 :         /*
     995                 :          * As long as we've filled the buffer at least once, its contents are
     996                 :          * entirely defined from valgrind's point of view, even though
     997                 :          * contents beyond the current end point may be stale.  But it's
     998 ECB             :          * possible - at least in the case of a parallel sort - to sort such
     999                 :          * small amount of data that we do not fill the buffer even once. Tell
    1000                 :          * valgrind that its contents are defined, so it doesn't bleat.
    1001                 :          */
    1002                 :         VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
    1003                 :                                   lt->buffer_size - lt->nbytes);
    1004                 : 
    1005 GIC         215 :         TapeBlockSetNBytes(lt->buffer, lt->nbytes);
    1006 GNC         215 :         ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
    1007                 :     }
    1008 GIC         215 :     lt->writing = false;
    1009             215 :     lt->frozen = true;
    1010                 : 
    1011 ECB             :     /*
    1012                 :      * The seek and backspace functions assume a single block read buffer.
    1013 EUB             :      * That's OK with current usage.  A larger buffer is helpful to make the
    1014                 :      * read pattern of the backing file look more sequential to the OS, when
    1015                 :      * we're reading from multiple tapes.  But at the end of a sort, when a
    1016                 :      * tape is frozen, we only read from a single tape anyway.
    1017                 :      */
    1018 GIC         215 :     if (!lt->buffer || lt->buffer_size != BLCKSZ)
    1019                 :     {
    1020 LBC           0 :         if (lt->buffer)
    1021               0 :             pfree(lt->buffer);
    1022               0 :         lt->buffer = palloc(BLCKSZ);
    1023 UIC           0 :         lt->buffer_size = BLCKSZ;
    1024 ECB             :     }
    1025 EUB             : 
    1026 ECB             :     /* Read the first block, or reset if tape is empty */
    1027 CBC         215 :     lt->curBlockNumber = lt->firstBlockNumber;
    1028             215 :     lt->pos = 0;
    1029 GIC         215 :     lt->nbytes = 0;
    1030 ECB             : 
    1031 CBC         215 :     if (lt->firstBlockNumber == -1L)
    1032 UIC           0 :         lt->nextBlockNumber = -1L;
    1033 GNC         215 :     ltsReadBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
    1034 CBC         215 :     if (TapeBlockIsLast(lt->buffer))
    1035 GIC         181 :         lt->nextBlockNumber = -1L;
    1036 ECB             :     else
    1037 CBC          34 :         lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
    1038 GIC         215 :     lt->nbytes = TapeBlockGetNBytes(lt->buffer);
    1039 ECB             : 
    1040                 :     /* Handle extra steps when caller is to share its tapeset */
    1041 GIC         215 :     if (share)
    1042                 :     {
    1043             206 :         BufFileExportFileSet(lts->pfile);
    1044             206 :         share->firstblocknumber = lt->firstBlockNumber;
    1045                 :     }
    1046             215 : }
    1047                 : 
    1048                 : /*
    1049                 :  * Backspace the tape a given number of bytes.  (We also support a more
    1050                 :  * general seek interface, see below.)
    1051                 :  *
    1052                 :  * *Only* a frozen-for-read tape can be backed up; we don't support
    1053                 :  * random access during write, and an unfrozen read tape may have
    1054                 :  * already discarded the desired data!
    1055 ECB             :  *
    1056                 :  * Returns the number of bytes backed up.  It can be less than the
    1057                 :  * requested amount, if there isn't that much data before the current
    1058                 :  * position.  The tape is positioned to the beginning of the tape in
    1059                 :  * that case.
    1060                 :  */
    1061                 : size_t
    1062 CBC          36 : LogicalTapeBackspace(LogicalTape *lt, size_t size)
    1063 EUB             : {
    1064 GIC          36 :     size_t      seekpos = 0;
    1065                 : 
    1066              36 :     Assert(lt->frozen);
    1067              36 :     Assert(lt->buffer_size == BLCKSZ);
    1068 ECB             : 
    1069 GIC          36 :     if (lt->buffer == NULL)
    1070 LBC           0 :         ltsInitReadBuffer(lt);
    1071 ECB             : 
    1072                 :     /*
    1073                 :      * Easy case for seek within current block.
    1074                 :      */
    1075 GIC          36 :     if (size <= (size_t) lt->pos)
    1076                 :     {
    1077              33 :         lt->pos -= (int) size;
    1078              33 :         return size;
    1079 ECB             :     }
    1080                 : 
    1081                 :     /*
    1082                 :      * Not-so-easy case, have to walk back the chain of blocks.  This
    1083                 :      * implementation would be pretty inefficient for long seeks, but we
    1084                 :      * really aren't doing that (a seek over one tuple is typical).
    1085                 :      */
    1086 GIC           3 :     seekpos = (size_t) lt->pos; /* part within this block */
    1087 CBC           3 :     while (size > seekpos)
    1088 EUB             :     {
    1089 CBC           3 :         long        prev = TapeBlockGetTrailer(lt->buffer)->prev;
    1090 ECB             : 
    1091 GIC           3 :         if (prev == -1L)
    1092                 :         {
    1093 EUB             :             /* Tried to back up beyond the beginning of tape. */
    1094 GIC           3 :             if (lt->curBlockNumber != lt->firstBlockNumber)
    1095 UBC           0 :                 elog(ERROR, "unexpected end of tape");
    1096 GBC           3 :             lt->pos = 0;
    1097 GIC           3 :             return seekpos;
    1098                 :         }
    1099                 : 
    1100 UNC           0 :         ltsReadBlock(lt->tapeSet, prev, lt->buffer);
    1101 EUB             : 
    1102 UBC           0 :         if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
    1103               0 :             elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
    1104                 :                  prev,
    1105 EUB             :                  TapeBlockGetTrailer(lt->buffer)->next,
    1106                 :                  lt->curBlockNumber);
    1107                 : 
    1108 UIC           0 :         lt->nbytes = TapeBlockPayloadSize;
    1109               0 :         lt->curBlockNumber = prev;
    1110               0 :         lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
    1111                 : 
    1112               0 :         seekpos += TapeBlockPayloadSize;
    1113 EUB             :     }
    1114                 : 
    1115                 :     /*
    1116                 :      * 'seekpos' can now be greater than 'size', because it points to the
    1117                 :      * beginning the target block.  The difference is the position within the
    1118                 :      * page.
    1119                 :      */
    1120 UIC           0 :     lt->pos = seekpos - size;
    1121               0 :     return size;
    1122                 : }
    1123                 : 
    1124                 : /*
    1125                 :  * Seek to an arbitrary position in a logical tape.
    1126 ECB             :  *
    1127                 :  * *Only* a frozen-for-read tape can be seeked.
    1128                 :  *
    1129                 :  * Must be called with a block/offset previously returned by
    1130                 :  * LogicalTapeTell().
    1131                 :  */
    1132                 : void
    1133 GBC        3096 : LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
    1134                 : {
    1135 CBC        3096 :     Assert(lt->frozen);
    1136 GIC        3096 :     Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
    1137 CBC        3096 :     Assert(lt->buffer_size == BLCKSZ);
    1138 ECB             : 
    1139 CBC        3096 :     if (lt->buffer == NULL)
    1140 LBC           0 :         ltsInitReadBuffer(lt);
    1141                 : 
    1142 GIC        3096 :     if (blocknum != lt->curBlockNumber)
    1143 ECB             :     {
    1144 GNC          39 :         ltsReadBlock(lt->tapeSet, blocknum, lt->buffer);
    1145 CBC          39 :         lt->curBlockNumber = blocknum;
    1146              39 :         lt->nbytes = TapeBlockPayloadSize;
    1147 GIC          39 :         lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
    1148                 :     }
    1149                 : 
    1150            3096 :     if (offset > lt->nbytes)
    1151 UIC           0 :         elog(ERROR, "invalid tape seek position");
    1152 GIC        3096 :     lt->pos = offset;
    1153            3096 : }
    1154                 : 
    1155 ECB             : /*
    1156                 :  * Obtain current position in a form suitable for a later LogicalTapeSeek.
    1157                 :  *
    1158 EUB             :  * NOTE: it'd be OK to do this during write phase with intention of using
    1159                 :  * the position for a seek after freezing.  Not clear if anyone needs that.
    1160 ECB             :  */
    1161                 : void
    1162 GIC        4404 : LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
    1163 ECB             : {
    1164 GIC        4404 :     if (lt->buffer == NULL)
    1165 LBC           0 :         ltsInitReadBuffer(lt);
    1166 ECB             : 
    1167 CBC        4404 :     Assert(lt->offsetBlockNumber == 0L);
    1168                 : 
    1169                 :     /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
    1170 GIC        4404 :     Assert(lt->buffer_size == BLCKSZ);
    1171                 : 
    1172            4404 :     *blocknum = lt->curBlockNumber;
    1173            4404 :     *offset = lt->pos;
    1174            4404 : }
    1175                 : 
    1176 ECB             : /*
    1177                 :  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
    1178                 :  *
    1179                 :  * This should not be called while there are open write buffers; otherwise it
    1180                 :  * may not account for buffered data.
    1181                 :  */
    1182                 : long
    1183 GIC       13873 : LogicalTapeSetBlocks(LogicalTapeSet *lts)
    1184                 : {
    1185           13873 :     return lts->nBlocksWritten - lts->nHoleBlocks;
    1186                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a