LCOV - differential code coverage report
Current view: top level - src/backend/utils/sort - sharedtuplestore.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.5 % 187 173 2 4 8 2 40 6 125 2 25 2 21
Current Date: 2023-04-08 15:15:32 Functions: 91.7 % 12 11 1 2 1 8 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * sharedtuplestore.c
       4                 :  *    Simple mechanism for sharing tuples between backends.
       5                 :  *
       6                 :  * This module contains a shared temporary tuple storage mechanism providing
       7                 :  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
       8                 :  * can write to a SharedTuplestore, and then multiple backends can later scan
       9                 :  * the stored tuples.  Currently, the only scan type supported is a parallel
      10                 :  * scan where each backend reads an arbitrary subset of the tuples that were
      11                 :  * written.
      12                 :  *
      13                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      14                 :  * Portions Copyright (c) 1994, Regents of the University of California
      15                 :  *
      16                 :  * IDENTIFICATION
      17                 :  *    src/backend/utils/sort/sharedtuplestore.c
      18                 :  *
      19                 :  *-------------------------------------------------------------------------
      20                 :  */
      21                 : 
      22                 : #include "postgres.h"
      23                 : 
      24                 : #include "access/htup.h"
      25                 : #include "access/htup_details.h"
      26                 : #include "miscadmin.h"
      27                 : #include "storage/buffile.h"
      28                 : #include "storage/lwlock.h"
      29                 : #include "storage/sharedfileset.h"
      30                 : #include "utils/sharedtuplestore.h"
      31                 : 
      32                 : /*
      33                 :  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
      34                 :  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
      35                 :  * at approximately the same rate as it allocates new chunks of memory to
      36                 :  * insert them into.
      37                 :  */
      38                 : #define STS_CHUNK_PAGES 4
      39                 : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
      40                 : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
      41                 : 
      42                 : /* Chunk written to disk. */
      43                 : typedef struct SharedTuplestoreChunk
      44                 : {
      45                 :     int         ntuples;        /* Number of tuples in this chunk. */
      46                 :     int         overflow;       /* If overflow, how many including this one? */
      47                 :     char        data[FLEXIBLE_ARRAY_MEMBER];
      48                 : } SharedTuplestoreChunk;
      49                 : 
      50                 : /* Per-participant shared state. */
      51                 : typedef struct SharedTuplestoreParticipant
      52                 : {
      53                 :     LWLock      lock;
      54                 :     BlockNumber read_page;      /* Page number for next read. */
      55                 :     BlockNumber npages;         /* Number of pages written. */
      56                 :     bool        writing;        /* Used only for assertions. */
      57                 : } SharedTuplestoreParticipant;
      58                 : 
      59                 : /* The control object that lives in shared memory. */
      60                 : struct SharedTuplestore
      61                 : {
      62                 :     int         nparticipants;  /* Number of participants that can write. */
      63                 :     int         flags;          /* Flag bits from SHARED_TUPLESTORE_XXX */
      64                 :     size_t      meta_data_size; /* Size of per-tuple header. */
      65                 :     char        name[NAMEDATALEN];  /* A name for this tuplestore. */
      66                 : 
      67                 :     /* Followed by per-participant shared state. */
      68                 :     SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
      69                 : };
      70                 : 
      71                 : /* Per-participant state that lives in backend-local memory. */
      72                 : struct SharedTuplestoreAccessor
      73                 : {
      74                 :     int         participant;    /* My participant number. */
      75                 :     SharedTuplestore *sts;      /* The shared state. */
      76                 :     SharedFileSet *fileset;     /* The SharedFileSet holding files. */
      77                 :     MemoryContext context;      /* Memory context for buffers. */
      78                 : 
      79                 :     /* State for reading. */
      80                 :     int         read_participant;   /* The current participant to read from. */
      81                 :     BufFile    *read_file;      /* The current file to read from. */
      82                 :     int         read_ntuples_available; /* The number of tuples in chunk. */
      83                 :     int         read_ntuples;   /* How many tuples have we read from chunk? */
      84                 :     size_t      read_bytes;     /* How many bytes have we read from chunk? */
      85                 :     char       *read_buffer;    /* A buffer for loading tuples. */
      86                 :     size_t      read_buffer_size;
      87                 :     BlockNumber read_next_page; /* Lowest block we'll consider reading. */
      88                 : 
      89                 :     /* State for writing. */
      90                 :     SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
      91                 :     BufFile    *write_file;     /* The current file to write to. */
      92                 :     BlockNumber write_page;     /* The next page to write to. */
      93                 :     char       *write_pointer;  /* Current write pointer within chunk. */
      94                 :     char       *write_end;      /* One past the end of the current chunk. */
      95                 : };
      96                 : 
      97                 : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
      98                 :                          int participant);
      99                 : 
     100                 : /*
     101                 :  * Return the amount of shared memory required to hold SharedTuplestore for a
     102                 :  * given number of participants.
     103                 :  */
     104                 : size_t
     105 CBC        2302 : sts_estimate(int participants)
     106                 : {
     107            4604 :     return offsetof(SharedTuplestore, participants) +
     108            2302 :         sizeof(SharedTuplestoreParticipant) * participants;
     109                 : }
     110                 : 
     111                 : /*
     112                 :  * Initialize a SharedTuplestore in existing shared memory.  There must be
     113                 :  * space for sts_estimate(participants) bytes.  If flags includes the value
     114                 :  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
     115                 :  * eagerly (but this isn't yet implemented).
     116                 :  *
     117                 :  * Tuples that are stored may optionally carry a piece of fixed sized
     118                 :  * meta-data which will be retrieved along with the tuple.  This is useful for
     119                 :  * the hash values used in multi-batch hash joins, but could have other
     120                 :  * applications.
     121                 :  *
     122                 :  * The caller must supply a SharedFileSet, which is essentially a directory
     123                 :  * that will be cleaned up automatically, and a name which must be unique
     124                 :  * across all SharedTuplestores created in the same SharedFileSet.
     125                 :  */
     126                 : SharedTuplestoreAccessor *
     127             972 : sts_initialize(SharedTuplestore *sts, int participants,
     128                 :                int my_participant_number,
     129                 :                size_t meta_data_size,
     130                 :                int flags,
     131                 :                SharedFileSet *fileset,
     132                 :                const char *name)
     133                 : {
     134                 :     SharedTuplestoreAccessor *accessor;
     135                 :     int         i;
     136                 : 
     137             972 :     Assert(my_participant_number < participants);
     138                 : 
     139             972 :     sts->nparticipants = participants;
     140             972 :     sts->meta_data_size = meta_data_size;
     141             972 :     sts->flags = flags;
     142                 : 
     143             972 :     if (strlen(name) > sizeof(sts->name) - 1)
     144 UBC           0 :         elog(ERROR, "SharedTuplestore name too long");
     145 CBC         972 :     strcpy(sts->name, name);
     146                 : 
     147                 :     /*
     148                 :      * Limit meta-data so it + tuple size always fits into a single chunk.
     149                 :      * sts_puttuple() and sts_read_tuple() could be made to support scenarios
     150                 :      * where that's not the case, but it's not currently required. If so,
     151                 :      * meta-data size probably should be made variable, too.
     152                 :      */
     153             972 :     if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
     154 UBC           0 :         elog(ERROR, "meta-data too long");
     155                 : 
     156 CBC        3682 :     for (i = 0; i < participants; ++i)
     157                 :     {
     158            2710 :         LWLockInitialize(&sts->participants[i].lock,
     159                 :                          LWTRANCHE_SHARED_TUPLESTORE);
     160            2710 :         sts->participants[i].read_page = 0;
     161            2710 :         sts->participants[i].npages = 0;
     162            2710 :         sts->participants[i].writing = false;
     163                 :     }
     164                 : 
     165             972 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     166             972 :     accessor->participant = my_participant_number;
     167             972 :     accessor->sts = sts;
     168             972 :     accessor->fileset = fileset;
     169             972 :     accessor->context = CurrentMemoryContext;
     170                 : 
     171             972 :     return accessor;
     172                 : }
     173                 : 
     174                 : /*
     175                 :  * Attach to a SharedTuplestore that has been initialized by another backend,
     176                 :  * so that this backend can read and write tuples.
     177                 :  */
     178                 : SharedTuplestoreAccessor *
     179            1221 : sts_attach(SharedTuplestore *sts,
     180                 :            int my_participant_number,
     181                 :            SharedFileSet *fileset)
     182                 : {
     183                 :     SharedTuplestoreAccessor *accessor;
     184                 : 
     185            1221 :     Assert(my_participant_number < sts->nparticipants);
     186                 : 
     187            1221 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     188            1221 :     accessor->participant = my_participant_number;
     189            1221 :     accessor->sts = sts;
     190            1221 :     accessor->fileset = fileset;
     191            1221 :     accessor->context = CurrentMemoryContext;
     192                 : 
     193            1221 :     return accessor;
     194                 : }
     195                 : 
     196                 : static void
     197            1964 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
     198                 : {
     199                 :     size_t      size;
     200                 : 
     201            1964 :     size = STS_CHUNK_PAGES * BLCKSZ;
     202            1964 :     BufFileWrite(accessor->write_file, accessor->write_chunk, size);
     203            1964 :     memset(accessor->write_chunk, 0, size);
     204            1964 :     accessor->write_pointer = &accessor->write_chunk->data[0];
     205            1964 :     accessor->sts->participants[accessor->participant].npages +=
     206                 :         STS_CHUNK_PAGES;
     207            1964 : }
     208                 : 
     209                 : /*
     210                 :  * Finish writing tuples.  This must be called by all backends that have
     211                 :  * written data before any backend begins reading it.
     212                 :  */
     213                 : void
     214            3507 : sts_end_write(SharedTuplestoreAccessor *accessor)
     215                 : {
     216            3507 :     if (accessor->write_file != NULL)
     217                 :     {
     218            1193 :         sts_flush_chunk(accessor);
     219            1193 :         BufFileClose(accessor->write_file);
     220            1193 :         pfree(accessor->write_chunk);
     221            1193 :         accessor->write_chunk = NULL;
     222            1193 :         accessor->write_file = NULL;
     223            1193 :         accessor->sts->participants[accessor->participant].writing = false;
     224                 :     }
     225            3507 : }
     226                 : 
     227                 : /*
     228                 :  * Prepare to rescan.  Only one participant must call this.  After it returns,
     229                 :  * all participants may call sts_begin_parallel_scan() and then loop over
     230                 :  * sts_parallel_scan_next().  This function must not be called concurrently
     231                 :  * with a scan, and synchronization to avoid that is the caller's
     232                 :  * responsibility.
     233                 :  */
     234                 : void
     235 UBC           0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
     236                 : {
     237                 :     int         i;
     238                 : 
     239                 :     /*
     240                 :      * Reset the shared read head for all participants' files.  Also set the
     241                 :      * initial chunk size to the minimum (any increases from that size will be
     242                 :      * recorded in chunk_expansion_log).
     243                 :      */
     244               0 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     245                 :     {
     246               0 :         accessor->sts->participants[i].read_page = 0;
     247                 :     }
     248               0 : }
     249                 : 
     250                 : /*
     251                 :  * Begin scanning the contents in parallel.
     252                 :  */
     253                 : void
     254 CBC         894 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
     255                 : {
     256                 :     int         i PG_USED_FOR_ASSERTS_ONLY;
     257                 : 
     258                 :     /* End any existing scan that was in progress. */
     259             894 :     sts_end_parallel_scan(accessor);
     260                 : 
     261                 :     /*
     262                 :      * Any backend that might have written into this shared tuplestore must
     263                 :      * have called sts_end_write(), so that all buffers are flushed and the
     264                 :      * files have stopped growing.
     265                 :      */
     266            3410 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     267            2516 :         Assert(!accessor->sts->participants[i].writing);
     268                 : 
     269                 :     /*
     270                 :      * We will start out reading the file that THIS backend wrote.  There may
     271                 :      * be some caching locality advantage to that.
     272                 :      */
     273             894 :     accessor->read_participant = accessor->participant;
     274             894 :     accessor->read_file = NULL;
     275             894 :     accessor->read_next_page = 0;
     276             894 : }
     277                 : 
     278                 : /*
     279                 :  * Finish a parallel scan, freeing associated backend-local resources.
     280                 :  */
     281                 : void
     282            4431 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
     283                 : {
     284                 :     /*
     285                 :      * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
     286                 :      * we'd probably need a reference count of current parallel scanners so we
     287                 :      * could safely do it only when the reference count reaches zero.
     288                 :      */
     289            4431 :     if (accessor->read_file != NULL)
     290                 :     {
     291 UBC           0 :         BufFileClose(accessor->read_file);
     292               0 :         accessor->read_file = NULL;
     293                 :     }
     294 CBC        4431 : }
     295                 : 
     296                 : /*
     297                 :  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
     298                 :  * pointer to meta data of that size must be provided.
     299                 :  */
     300                 : void
     301         1250649 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
     302                 :              MinimalTuple tuple)
     303                 : {
     304                 :     size_t      size;
     305                 : 
     306                 :     /* Do we have our own file yet? */
     307         1250649 :     if (accessor->write_file == NULL)
     308                 :     {
     309                 :         SharedTuplestoreParticipant *participant;
     310                 :         char        name[MAXPGPATH];
     311                 : 
     312                 :         /* Create one.  Only this backend will write into it. */
     313            1193 :         sts_filename(name, accessor, accessor->participant);
     314            1193 :         accessor->write_file =
     315            1193 :             BufFileCreateFileSet(&accessor->fileset->fs, name);
     316                 : 
     317                 :         /* Set up the shared state for this backend's file. */
     318            1193 :         participant = &accessor->sts->participants[accessor->participant];
     319            1193 :         participant->writing = true; /* for assertions only */
     320                 :     }
     321                 : 
     322                 :     /* Do we have space? */
     323         1250649 :     size = accessor->sts->meta_data_size + tuple->t_len;
     324         1250649 :     if (accessor->write_pointer + size > accessor->write_end)
     325                 :     {
     326            1856 :         if (accessor->write_chunk == NULL)
     327                 :         {
     328                 :             /* First time through.  Allocate chunk. */
     329            1193 :             accessor->write_chunk = (SharedTuplestoreChunk *)
     330            1193 :                 MemoryContextAllocZero(accessor->context,
     331                 :                                        STS_CHUNK_PAGES * BLCKSZ);
     332            1193 :             accessor->write_chunk->ntuples = 0;
     333            1193 :             accessor->write_pointer = &accessor->write_chunk->data[0];
     334            1193 :             accessor->write_end = (char *)
     335            1193 :                 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
     336                 :         }
     337                 :         else
     338                 :         {
     339                 :             /* See if flushing helps. */
     340             663 :             sts_flush_chunk(accessor);
     341                 :         }
     342                 : 
     343                 :         /* It may still not be enough in the case of a gigantic tuple. */
     344            1856 :         if (accessor->write_pointer + size > accessor->write_end)
     345                 :         {
     346                 :             size_t      written;
     347                 : 
     348                 :             /*
     349                 :              * We'll write the beginning of the oversized tuple, and then
     350                 :              * write the rest in some number of 'overflow' chunks.
     351                 :              *
     352                 :              * sts_initialize() verifies that the size of the tuple +
     353                 :              * meta-data always fits into a chunk. Because the chunk has been
     354                 :              * flushed above, we can be sure to have all of a chunk's usable
     355                 :              * space available.
     356                 :              */
     357              12 :             Assert(accessor->write_pointer + accessor->sts->meta_data_size +
     358                 :                    sizeof(uint32) < accessor->write_end);
     359                 : 
     360                 :             /* Write the meta-data as one chunk. */
     361              12 :             if (accessor->sts->meta_data_size > 0)
     362              12 :                 memcpy(accessor->write_pointer, meta_data,
     363              12 :                        accessor->sts->meta_data_size);
     364                 : 
     365                 :             /*
     366                 :              * Write as much of the tuple as we can fit. This includes the
     367                 :              * tuple's size at the start.
     368                 :              */
     369              12 :             written = accessor->write_end - accessor->write_pointer -
     370              12 :                 accessor->sts->meta_data_size;
     371              12 :             memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
     372                 :                    tuple, written);
     373              12 :             ++accessor->write_chunk->ntuples;
     374              12 :             size -= accessor->sts->meta_data_size;
     375              12 :             size -= written;
     376                 :             /* Now write as many overflow chunks as we need for the rest. */
     377             120 :             while (size > 0)
     378                 :             {
     379                 :                 size_t      written_this_chunk;
     380                 : 
     381             108 :                 sts_flush_chunk(accessor);
     382                 : 
     383                 :                 /*
     384                 :                  * How many overflow chunks to go?  This will allow readers to
     385                 :                  * skip all of them at once instead of reading each one.
     386                 :                  */
     387             108 :                 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
     388                 :                     STS_CHUNK_DATA_SIZE;
     389             108 :                 written_this_chunk =
     390             108 :                     Min(accessor->write_end - accessor->write_pointer, size);
     391             108 :                 memcpy(accessor->write_pointer, (char *) tuple + written,
     392                 :                        written_this_chunk);
     393             108 :                 accessor->write_pointer += written_this_chunk;
     394             108 :                 size -= written_this_chunk;
     395             108 :                 written += written_this_chunk;
     396                 :             }
     397              12 :             return;
     398                 :         }
     399                 :     }
     400                 : 
     401                 :     /* Copy meta-data and tuple into buffer. */
     402         1250637 :     if (accessor->sts->meta_data_size > 0)
     403         1250637 :         memcpy(accessor->write_pointer, meta_data,
     404         1250637 :                accessor->sts->meta_data_size);
     405         1250637 :     memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
     406         1250637 :            tuple->t_len);
     407         1250637 :     accessor->write_pointer += size;
     408         1250637 :     ++accessor->write_chunk->ntuples;
     409                 : }
     410                 : 
     411                 : static MinimalTuple
     412         1250649 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
     413                 : {
     414                 :     MinimalTuple tuple;
     415                 :     uint32      size;
     416                 :     size_t      remaining_size;
     417                 :     size_t      this_chunk_size;
     418                 :     char       *destination;
     419                 : 
     420                 :     /*
     421                 :      * We'll keep track of bytes read from this chunk so that we can detect an
     422                 :      * overflowing tuple and switch to reading overflow pages.
     423                 :      */
     424         1250649 :     if (accessor->sts->meta_data_size > 0)
     425                 :     {
     426 GNC     1250649 :         BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
     427 GIC     1250649 :         accessor->read_bytes += accessor->sts->meta_data_size;
     428 ECB             :     }
     429 GNC     1250649 :     BufFileReadExact(accessor->read_file, &size, sizeof(size));
     430 CBC     1250649 :     accessor->read_bytes += sizeof(size);
     431 GIC     1250649 :     if (size > accessor->read_buffer_size)
     432 ECB             :     {
     433                 :         size_t      new_read_buffer_size;
     434                 : 
     435 CBC         721 :         if (accessor->read_buffer != NULL)
     436 LBC           0 :             pfree(accessor->read_buffer);
     437 CBC         721 :         new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
     438 GIC         721 :         accessor->read_buffer =
     439             721 :             MemoryContextAlloc(accessor->context, new_read_buffer_size);
     440 CBC         721 :         accessor->read_buffer_size = new_read_buffer_size;
     441                 :     }
     442 GIC     1250649 :     remaining_size = size - sizeof(uint32);
     443         1250649 :     this_chunk_size = Min(remaining_size,
     444                 :                           BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
     445 CBC     1250649 :     destination = accessor->read_buffer + sizeof(uint32);
     446 GNC     1250649 :     BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     447 CBC     1250649 :     accessor->read_bytes += this_chunk_size;
     448 GIC     1250649 :     remaining_size -= this_chunk_size;
     449         1250649 :     destination += this_chunk_size;
     450 CBC     1250649 :     ++accessor->read_ntuples;
     451 ECB             : 
     452                 :     /* Check if we need to read any overflow chunks. */
     453 CBC     1250757 :     while (remaining_size > 0)
     454                 :     {
     455                 :         /* We are now positioned at the start of an overflow chunk. */
     456                 :         SharedTuplestoreChunk chunk_header;
     457                 : 
     458 GNC         108 :         BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     459 CBC         108 :         accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     460 GIC         108 :         if (chunk_header.overflow == 0)
     461 LBC           0 :             ereport(ERROR,
     462                 :                     (errcode_for_file_access(),
     463                 :                      errmsg("unexpected chunk in shared tuplestore temporary file"),
     464                 :                      errdetail_internal("Expected overflow chunk.")));
     465 GIC         108 :         accessor->read_next_page += STS_CHUNK_PAGES;
     466             108 :         this_chunk_size = Min(remaining_size,
     467                 :                               BLCKSZ * STS_CHUNK_PAGES -
     468 ECB             :                               STS_CHUNK_HEADER_SIZE);
     469 GNC         108 :         BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     470 GIC         108 :         accessor->read_bytes += this_chunk_size;
     471 CBC         108 :         remaining_size -= this_chunk_size;
     472             108 :         destination += this_chunk_size;
     473                 : 
     474                 :         /*
     475 ECB             :          * These will be used to count regular tuples following the oversized
     476                 :          * tuple that spilled into this overflow chunk.
     477                 :          */
     478 GIC         108 :         accessor->read_ntuples = 0;
     479 CBC         108 :         accessor->read_ntuples_available = chunk_header.ntuples;
     480 ECB             :     }
     481                 : 
     482 CBC     1250649 :     tuple = (MinimalTuple) accessor->read_buffer;
     483 GIC     1250649 :     tuple->t_len = size;
     484                 : 
     485 CBC     1250649 :     return tuple;
     486                 : }
     487 ECB             : 
     488                 : /*
     489                 :  * Get the next tuple in the current parallel scan.
     490                 :  */
     491                 : MinimalTuple
     492 CBC     1251472 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
     493                 : {
     494                 :     SharedTuplestoreParticipant *p;
     495                 :     BlockNumber read_page;
     496                 :     bool        eof;
     497 ECB             : 
     498                 :     for (;;)
     499                 :     {
     500                 :         /* Can we read more tuples from the current chunk? */
     501 CBC     1254811 :         if (accessor->read_ntuples < accessor->read_ntuples_available)
     502         1250649 :             return sts_read_tuple(accessor, meta_data);
     503 ECB             : 
     504                 :         /* Find the location of a new chunk to read. */
     505 GIC        4162 :         p = &accessor->sts->participants[accessor->read_participant];
     506                 : 
     507            4162 :         LWLockAcquire(&p->lock, LW_EXCLUSIVE);
     508 ECB             :         /* We can skip directly past overflow pages we know about. */
     509 GBC        4162 :         if (p->read_page < accessor->read_next_page)
     510 GIC          12 :             p->read_page = accessor->read_next_page;
     511            4162 :         eof = p->read_page >= p->npages;
     512            4162 :         if (!eof)
     513 ECB             :         {
     514                 :             /* Claim the next chunk. */
     515 GIC        1856 :             read_page = p->read_page;
     516                 :             /* Advance the read head for the next reader. */
     517            1856 :             p->read_page += STS_CHUNK_PAGES;
     518            1856 :             accessor->read_next_page = p->read_page;
     519 ECB             :         }
     520 GIC        4162 :         LWLockRelease(&p->lock);
     521 EUB             : 
     522 GBC        4162 :         if (!eof)
     523 EUB             :         {
     524                 :             SharedTuplestoreChunk chunk_header;
     525 ECB             : 
     526                 :             /* Make sure we have the file open. */
     527 CBC        1856 :             if (accessor->read_file == NULL)
     528                 :             {
     529                 :                 char        name[MAXPGPATH];
     530                 : 
     531 GIC        1226 :                 sts_filename(name, accessor, accessor->read_participant);
     532            1226 :                 accessor->read_file =
     533 CBC        1226 :                     BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
     534                 :                                        false);
     535 ECB             :             }
     536                 : 
     537                 :             /* Seek and load the chunk header. */
     538 GIC        1856 :             if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
     539 UIC           0 :                 ereport(ERROR,
     540                 :                         (errcode_for_file_access(),
     541                 :                          errmsg("could not seek to block %u in shared tuplestore temporary file",
     542                 :                                 read_page)));
     543 GNC        1856 :             BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     544                 : 
     545                 :             /*
     546                 :              * If this is an overflow chunk, we skip it and any following
     547 ECB             :              * overflow chunks all at once.
     548                 :              */
     549 GIC        1856 :             if (chunk_header.overflow > 0)
     550                 :             {
     551 UIC           0 :                 accessor->read_next_page = read_page +
     552               0 :                     chunk_header.overflow * STS_CHUNK_PAGES;
     553               0 :                 continue;
     554 ECB             :             }
     555                 : 
     556 CBC        1856 :             accessor->read_ntuples = 0;
     557            1856 :             accessor->read_ntuples_available = chunk_header.ntuples;
     558 GIC        1856 :             accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     559                 : 
     560                 :             /* Go around again, so we can get a tuple from this chunk. */
     561                 :         }
     562                 :         else
     563                 :         {
     564            2306 :             if (accessor->read_file != NULL)
     565                 :             {
     566            1226 :                 BufFileClose(accessor->read_file);
     567            1226 :                 accessor->read_file = NULL;
     568                 :             }
     569                 : 
     570                 :             /*
     571                 :              * Try the next participant's file.  If we've gone full circle,
     572                 :              * we're done.
     573                 :              */
     574            2306 :             accessor->read_participant = (accessor->read_participant + 1) %
     575            2306 :                 accessor->sts->nparticipants;
     576            2306 :             if (accessor->read_participant == accessor->participant)
     577             823 :                 break;
     578            1483 :             accessor->read_next_page = 0;
     579                 : 
     580                 :             /* Go around again, so we can get a chunk from this file. */
     581                 :         }
     582                 :     }
     583                 : 
     584             823 :     return NULL;
     585                 : }
     586                 : 
     587                 : /*
     588                 :  * Create the name used for the BufFile that a given participant will write.
     589                 :  */
     590                 : static void
     591            2419 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
     592                 : {
     593            2419 :     snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
     594            2419 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a