LCOV - differential code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit UNC GNC
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 89.8 % 245 220 25 220
Current Date: 2024-04-14 14:21:10 Functions: 90.0 % 10 9 1 9
Baseline: 16@8cea358b128 Branches: 71.9 % 192 138 54 138
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed [..60] days: 89.8 % 245 220 25 220
Function coverage date bins:
[..60] days: 90.0 % 10 9 1 9
Branch coverage date bins:
[..60] days: 71.9 % 192 138 54 138

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * read_stream.c
                                  4                 :                :  *    Mechanism for accessing buffered relation data with look-ahead
                                  5                 :                :  *
                                  6                 :                :  * Code that needs to access relation data typically pins blocks one at a
                                  7                 :                :  * time, often in a predictable order that might be sequential or data-driven.
                                  8                 :                :  * Calling the simple ReadBuffer() function for each block is inefficient,
                                  9                 :                :  * because blocks that are not yet in the buffer pool require I/O operations
                                 10                 :                :  * that are small and might stall waiting for storage.  This mechanism looks
                                 11                 :                :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
                                 12                 :                :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
                                 13                 :                :  * distance.
                                 14                 :                :  *
                                 15                 :                :  * A user-provided callback generates a stream of block numbers that is used
                                 16                 :                :  * to form reads of up to io_combine_limit, by attempting to merge them with a
                                 17                 :                :  * pending read.  When that isn't possible, the existing pending read is sent
                                 18                 :                :  * to StartReadBuffers() so that a new one can begin to form.
                                 19                 :                :  *
                                 20                 :                :  * The algorithm for controlling the look-ahead distance tries to classify the
                                 21                 :                :  * stream into three ideal behaviors:
                                 22                 :                :  *
                                 23                 :                :  * A) No I/O is necessary, because the requested blocks are fully cached
                                 24                 :                :  * already.  There is no benefit to looking ahead more than one block, so
                                 25                 :                :  * distance is 1.  This is the default initial assumption.
                                 26                 :                :  *
                                 27                 :                :  * B) I/O is necessary, but fadvise is undesirable because the access is
                                 28                 :                :  * sequential, or impossible because direct I/O is enabled or the system
                                 29                 :                :  * doesn't support advice.  There is no benefit in looking ahead more than
                                 30                 :                :  * io_combine_limit, because in this case only goal is larger read system
                                 31                 :                :  * calls.  Looking further ahead would pin many buffers and perform
                                 32                 :                :  * speculative work looking ahead for no benefit.
                                 33                 :                :  *
                                 34                 :                :  * C) I/O is necesssary, it appears random, and this system supports fadvise.
                                 35                 :                :  * We'll look further ahead in order to reach the configured level of I/O
                                 36                 :                :  * concurrency.
                                 37                 :                :  *
                                 38                 :                :  * The distance increases rapidly and decays slowly, so that it moves towards
                                 39                 :                :  * those levels as different I/O patterns are discovered.  For example, a
                                 40                 :                :  * sequential scan of fully cached data doesn't bother looking ahead, but a
                                 41                 :                :  * sequential scan that hits a region of uncached blocks will start issuing
                                 42                 :                :  * increasingly wide read calls until it plateaus at io_combine_limit.
                                 43                 :                :  *
                                 44                 :                :  * The main data structure is a circular queue of buffers of size
                                 45                 :                :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
                                 46                 :                :  * returned by read_stream_next_buffer().  Each buffer also has an optional
                                 47                 :                :  * variable sized object that is passed from the callback to the consumer of
                                 48                 :                :  * buffers.
                                 49                 :                :  *
                                 50                 :                :  * Parallel to the queue of buffers, there is a circular queue of in-progress
                                 51                 :                :  * I/Os that have been started with StartReadBuffers(), and for which
                                 52                 :                :  * WaitReadBuffers() must be called before returning the buffer.
                                 53                 :                :  *
                                 54                 :                :  * For example, if the callback return block numbers 10, 42, 43, 60 in
                                 55                 :                :  * successive calls, then these data structures might appear as follows:
                                 56                 :                :  *
                                 57                 :                :  *                          buffers buf/data       ios
                                 58                 :                :  *
                                 59                 :                :  *                          +----+  +-----+       +--------+
                                 60                 :                :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
                                 61                 :                :  *                          +----+  +-----+  |    +--------+
                                 62                 :                :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
                                 63                 :                :  *                          +----+  +-----+  | |  +--------+
                                 64                 :                :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
                                 65                 :                :  *                          +----+  +-----+    |  +--------+
                                 66                 :                :  *                          | 43 |  |  ?  |    |  |        |
                                 67                 :                :  *                          +----+  +-----+    |  +--------+
                                 68                 :                :  *                          | 44 |  |  ?  |    |  |        |
                                 69                 :                :  *                          +----+  +-----+    |  +--------+
                                 70                 :                :  *                          | 60 |  |  ?  |<---+
                                 71                 :                :  *                          +----+  +-----+
                                 72                 :                :  *     next_buffer_index -> |    |  |     |
                                 73                 :                :  *                          +----+  +-----+
                                 74                 :                :  *
                                 75                 :                :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
                                 76                 :                :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
                                 77                 :                :  * the range 42..44 requires an I/O wait before its buffers are returned, as
                                 78                 :                :  * does block 60.
                                 79                 :                :  *
                                 80                 :                :  *
                                 81                 :                :  * Portions Copyright (c) 2024, PostgreSQL Global Development Group
                                 82                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 83                 :                :  *
                                 84                 :                :  * IDENTIFICATION
                                 85                 :                :  *    src/backend/storage/aio/read_stream.c
                                 86                 :                :  *
                                 87                 :                :  *-------------------------------------------------------------------------
                                 88                 :                :  */
                                 89                 :                : #include "postgres.h"
                                 90                 :                : 
                                 91                 :                : #include "catalog/pg_tablespace.h"
                                 92                 :                : #include "miscadmin.h"
                                 93                 :                : #include "storage/fd.h"
                                 94                 :                : #include "storage/smgr.h"
                                 95                 :                : #include "storage/read_stream.h"
                                 96                 :                : #include "utils/memdebug.h"
                                 97                 :                : #include "utils/rel.h"
                                 98                 :                : #include "utils/spccache.h"
                                 99                 :                : 
                                100                 :                : typedef struct InProgressIO
                                101                 :                : {
                                102                 :                :     int16       buffer_index;
                                103                 :                :     ReadBuffersOperation op;
                                104                 :                : } InProgressIO;
                                105                 :                : 
                                106                 :                : /*
                                107                 :                :  * State for managing a stream of reads.
                                108                 :                :  */
                                109                 :                : struct ReadStream
                                110                 :                : {
                                111                 :                :     int16       max_ios;
                                112                 :                :     int16       ios_in_progress;
                                113                 :                :     int16       queue_size;
                                114                 :                :     int16       max_pinned_buffers;
                                115                 :                :     int16       pinned_buffers;
                                116                 :                :     int16       distance;
                                117                 :                :     bool        advice_enabled;
                                118                 :                : 
                                119                 :                :     /*
                                120                 :                :      * Small buffer of block numbers, useful for 'ungetting' to resolve flow
                                121                 :                :      * control problems when I/Os are split.  Also useful for batch-loading
                                122                 :                :      * block numbers in the fast path.
                                123                 :                :      */
                                124                 :                :     BlockNumber blocknums[16];
                                125                 :                :     int16       blocknums_count;
                                126                 :                :     int16       blocknums_next;
                                127                 :                : 
                                128                 :                :     /*
                                129                 :                :      * The callback that will tell us which block numbers to read, and an
                                130                 :                :      * opaque pointer that will be pass to it for its own purposes.
                                131                 :                :      */
                                132                 :                :     ReadStreamBlockNumberCB callback;
                                133                 :                :     void       *callback_private_data;
                                134                 :                : 
                                135                 :                :     /* Next expected block, for detecting sequential access. */
                                136                 :                :     BlockNumber seq_blocknum;
                                137                 :                : 
                                138                 :                :     /* The read operation we are currently preparing. */
                                139                 :                :     BlockNumber pending_read_blocknum;
                                140                 :                :     int16       pending_read_nblocks;
                                141                 :                : 
                                142                 :                :     /* Space for buffers and optional per-buffer private data. */
                                143                 :                :     size_t      per_buffer_data_size;
                                144                 :                :     void       *per_buffer_data;
                                145                 :                : 
                                146                 :                :     /* Read operations that have been started but not waited for yet. */
                                147                 :                :     InProgressIO *ios;
                                148                 :                :     int16       oldest_io_index;
                                149                 :                :     int16       next_io_index;
                                150                 :                : 
                                151                 :                :     bool        fast_path;
                                152                 :                : 
                                153                 :                :     /* Circular queue of buffers. */
                                154                 :                :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
                                155                 :                :     int16       next_buffer_index;  /* Index of next buffer to pin */
                                156                 :                :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
                                157                 :                : };
                                158                 :                : 
                                159                 :                : /*
                                160                 :                :  * Return a pointer to the per-buffer data by index.
                                161                 :                :  */
                                162                 :                : static inline void *
   11 tmunro@postgresql.or      163                 :GNC     1506190 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
                                164                 :                : {
                                165                 :        3012380 :     return (char *) stream->per_buffer_data +
                                166                 :        1506190 :         stream->per_buffer_data_size * buffer_index;
                                167                 :                : }
                                168                 :                : 
                                169                 :                : /*
                                170                 :                :  * Ask the callback which block it would like us to read next, with a small
                                171                 :                :  * buffer in front to allow read_stream_unget_block() to work and to allow the
                                172                 :                :  * fast path to skip this function and work directly from the array.
                                173                 :                :  */
                                174                 :                : static inline BlockNumber
                                175                 :        1506190 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
                                176                 :                : {
                                177         [ +  + ]:        1506190 :     if (stream->blocknums_next < stream->blocknums_count)
                                178                 :          29545 :         return stream->blocknums[stream->blocknums_next++];
                                179                 :                : 
                                180                 :                :     /*
                                181                 :                :      * We only bother to fetch one at a time here (but see the fast path which
                                182                 :                :      * uses more).
                                183                 :                :      */
                                184                 :        1476645 :     return stream->callback(stream,
                                185                 :                :                             stream->callback_private_data,
                                186                 :                :                             per_buffer_data);
                                187                 :                : }
                                188                 :                : 
                                189                 :                : /*
                                190                 :                :  * In order to deal with short reads in StartReadBuffers(), we sometimes need
                                191                 :                :  * to defer handling of a block until later.
                                192                 :                :  */
                                193                 :                : static inline void
   11 tmunro@postgresql.or      194                 :UNC           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
                                195                 :                : {
                                196         [ #  # ]:              0 :     if (stream->blocknums_next == stream->blocknums_count)
                                197                 :                :     {
                                198                 :                :         /* Never initialized or entirely consumed.  Re-initialize. */
                                199                 :              0 :         stream->blocknums[0] = blocknum;
                                200                 :              0 :         stream->blocknums_count = 1;
                                201                 :              0 :         stream->blocknums_next = 0;
                                202                 :                :     }
                                203                 :                :     else
                                204                 :                :     {
                                205                 :                :         /* Must be the last value return from blocknums array. */
                                206         [ #  # ]:              0 :         Assert(stream->blocknums_next > 0);
                                207                 :              0 :         stream->blocknums_next--;
                                208         [ #  # ]:              0 :         Assert(stream->blocknums[stream->blocknums_next] == blocknum);
                                209                 :                :     }
                                210                 :              0 : }
                                211                 :                : 
                                212                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                213                 :                : static void
   11 tmunro@postgresql.or      214                 :GNC      179148 : read_stream_fill_blocknums(ReadStream *stream)
                                215                 :                : {
                                216                 :                :     BlockNumber blocknum;
                                217                 :         179148 :     int         i = 0;
                                218                 :                : 
                                219                 :                :     do
                                220                 :                :     {
                                221                 :        1899178 :         blocknum = stream->callback(stream,
                                222                 :                :                                     stream->callback_private_data,
                                223                 :                :                                     NULL);
                                224                 :        1899178 :         stream->blocknums[i++] = blocknum;
                                225   [ +  +  +  + ]:        1899178 :     } while (i < lengthof(stream->blocknums) &&
                                226                 :                :              blocknum != InvalidBlockNumber);
                                227                 :         179148 :     stream->blocknums_count = i;
                                228                 :         179148 :     stream->blocknums_next = 0;
                                229                 :         179148 : }
                                230                 :                : #endif
                                231                 :                : 
                                232                 :                : static void
                                233                 :         805176 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
                                234                 :                : {
                                235                 :                :     bool        need_wait;
                                236                 :                :     int         nblocks;
                                237                 :                :     int         flags;
                                238                 :                :     int16       io_index;
                                239                 :                :     int16       overflow;
                                240                 :                :     int16       buffer_index;
                                241                 :                : 
                                242                 :                :     /* This should only be called with a pending read. */
                                243         [ -  + ]:         805176 :     Assert(stream->pending_read_nblocks > 0);
                                244         [ -  + ]:         805176 :     Assert(stream->pending_read_nblocks <= io_combine_limit);
                                245                 :                : 
                                246                 :                :     /* We had better not exceed the pin limit by starting this read. */
                                247         [ -  + ]:         805176 :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
                                248                 :                :            stream->max_pinned_buffers);
                                249                 :                : 
                                250                 :                :     /* We had better not be overwriting an existing pinned buffer. */
                                251         [ +  + ]:         805176 :     if (stream->pinned_buffers > 0)
                                252         [ -  + ]:           2488 :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
                                253                 :                :     else
                                254         [ -  + ]:         802688 :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
                                255                 :                : 
                                256                 :                :     /*
                                257                 :                :      * If advice hasn't been suppressed, this system supports it, and this
                                258                 :                :      * isn't a strictly sequential pattern, then we'll issue advice.
                                259                 :                :      */
                                260         [ +  + ]:         805176 :     if (!suppress_advice &&
                                261         [ +  + ]:         377738 :         stream->advice_enabled &&
                                262         [ +  + ]:          18150 :         stream->pending_read_blocknum != stream->seq_blocknum)
                                263                 :           1873 :         flags = READ_BUFFERS_ISSUE_ADVICE;
                                264                 :                :     else
                                265                 :         803303 :         flags = 0;
                                266                 :                : 
                                267                 :                :     /* We say how many blocks we want to read, but may be smaller on return. */
                                268                 :         805176 :     buffer_index = stream->next_buffer_index;
                                269                 :         805176 :     io_index = stream->next_io_index;
                                270                 :         805176 :     nblocks = stream->pending_read_nblocks;
                                271                 :         805176 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
                                272                 :         805176 :                                  &stream->buffers[buffer_index],
                                273                 :                :                                  stream->pending_read_blocknum,
                                274                 :                :                                  &nblocks,
                                275                 :                :                                  flags);
                                276                 :         805176 :     stream->pinned_buffers += nblocks;
                                277                 :                : 
                                278                 :                :     /* Remember whether we need to wait before returning this buffer. */
                                279         [ +  + ]:         805176 :     if (!need_wait)
                                280                 :                :     {
                                281                 :                :         /* Look-ahead distance decays, no I/O necessary (behavior A). */
                                282         [ +  + ]:         559151 :         if (stream->distance > 1)
                                283                 :           1092 :             stream->distance--;
                                284                 :                :     }
                                285                 :                :     else
                                286                 :                :     {
                                287                 :                :         /*
                                288                 :                :          * Remember to call WaitReadBuffers() before returning head buffer.
                                289                 :                :          * Look-ahead distance will be adjusted after waiting.
                                290                 :                :          */
                                291                 :         246025 :         stream->ios[io_index].buffer_index = buffer_index;
                                292         [ +  + ]:         246025 :         if (++stream->next_io_index == stream->max_ios)
                                293                 :         232738 :             stream->next_io_index = 0;
                                294         [ -  + ]:         246025 :         Assert(stream->ios_in_progress < stream->max_ios);
                                295                 :         246025 :         stream->ios_in_progress++;
                                296                 :         246025 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
                                297                 :                :     }
                                298                 :                : 
                                299                 :                :     /*
                                300                 :                :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
                                301                 :                :      * we want it to wrap around at queue_size.  Slide overflowing buffers to
                                302                 :                :      * the front of the array.
                                303                 :                :      */
                                304                 :         805176 :     overflow = (buffer_index + nblocks) - stream->queue_size;
                                305         [ +  + ]:         805176 :     if (overflow > 0)
                                306                 :           1782 :         memmove(&stream->buffers[0],
                                307                 :           1782 :                 &stream->buffers[stream->queue_size],
                                308                 :                :                 sizeof(stream->buffers[0]) * overflow);
                                309                 :                : 
                                310                 :                :     /* Compute location of start of next read, without using % operator. */
                                311                 :         805176 :     buffer_index += nblocks;
                                312         [ +  + ]:         805176 :     if (buffer_index >= stream->queue_size)
                                313                 :         146665 :         buffer_index -= stream->queue_size;
                                314   [ +  -  -  + ]:         805176 :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                315                 :         805176 :     stream->next_buffer_index = buffer_index;
                                316                 :                : 
                                317                 :                :     /* Adjust the pending read to cover the remaining portion, if any. */
                                318                 :         805176 :     stream->pending_read_blocknum += nblocks;
                                319                 :         805176 :     stream->pending_read_nblocks -= nblocks;
                                320                 :         805176 : }
                                321                 :                : 
                                322                 :                : static void
                                323                 :        1587164 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
                                324                 :                : {
                                325         [ +  + ]:        2430531 :     while (stream->ios_in_progress < stream->max_ios &&
                                326         [ +  + ]:        2425072 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
                                327                 :                :     {
                                328                 :                :         BlockNumber blocknum;
                                329                 :                :         int16       buffer_index;
                                330                 :                :         void       *per_buffer_data;
                                331                 :                : 
                                332         [ -  + ]:        1506190 :         if (stream->pending_read_nblocks == io_combine_limit)
                                333                 :                :         {
   11 tmunro@postgresql.or      334                 :UNC           0 :             read_stream_start_pending_read(stream, suppress_advice);
                                335                 :              0 :             suppress_advice = false;
                                336                 :              0 :             continue;
                                337                 :                :         }
                                338                 :                : 
                                339                 :                :         /*
                                340                 :                :          * See which block the callback wants next in the stream.  We need to
                                341                 :                :          * compute the index of the Nth block of the pending read including
                                342                 :                :          * wrap-around, but we don't want to use the expensive % operator.
                                343                 :                :          */
   11 tmunro@postgresql.or      344                 :GNC     1506190 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
                                345         [ +  + ]:        1506190 :         if (buffer_index >= stream->queue_size)
                                346                 :          19050 :             buffer_index -= stream->queue_size;
                                347   [ +  -  -  + ]:        1506190 :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
                                348                 :        1506190 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
                                349                 :        1506190 :         blocknum = read_stream_get_block(stream, per_buffer_data);
                                350         [ +  + ]:        1506190 :         if (blocknum == InvalidBlockNumber)
                                351                 :                :         {
                                352                 :                :             /* End of stream. */
                                353                 :         662823 :             stream->distance = 0;
                                354                 :         662823 :             break;
                                355                 :                :         }
                                356                 :                : 
                                357                 :                :         /* Can we merge it with the pending read? */
                                358         [ +  + ]:         843367 :         if (stream->pending_read_nblocks > 0 &&
                                359         [ +  - ]:          40325 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
                                360                 :                :         {
                                361                 :          40325 :             stream->pending_read_nblocks++;
                                362                 :          40325 :             continue;
                                363                 :                :         }
                                364                 :                : 
                                365                 :                :         /* We have to start the pending read before we can build another. */
    7                           366         [ -  + ]:         803042 :         while (stream->pending_read_nblocks > 0)
                                367                 :                :         {
   11 tmunro@postgresql.or      368                 :UNC           0 :             read_stream_start_pending_read(stream, suppress_advice);
                                369                 :              0 :             suppress_advice = false;
                                370         [ #  # ]:              0 :             if (stream->ios_in_progress == stream->max_ios)
                                371                 :                :             {
                                372                 :                :                 /* And we've hit the limit.  Rewind, and stop here. */
                                373                 :              0 :                 read_stream_unget_block(stream, blocknum);
                                374                 :              0 :                 return;
                                375                 :                :             }
                                376                 :                :         }
                                377                 :                : 
                                378                 :                :         /* This is the start of a new pending read. */
   11 tmunro@postgresql.or      379                 :GNC      803042 :         stream->pending_read_blocknum = blocknum;
                                380                 :         803042 :         stream->pending_read_nblocks = 1;
                                381                 :                :     }
                                382                 :                : 
                                383                 :                :     /*
                                384                 :                :      * We don't start the pending read just because we've hit the distance
                                385                 :                :      * limit, preferring to give it another chance to grow to full
                                386                 :                :      * io_combine_limit size once more buffers have been consumed.  However,
                                387                 :                :      * if we've already reached io_combine_limit, or we've reached the
                                388                 :                :      * distance limit and there isn't anything pinned yet, or the callback has
                                389                 :                :      * signaled end-of-stream, we start the read immediately.
                                390                 :                :      */
                                391         [ +  + ]:        1587164 :     if (stream->pending_read_nblocks > 0 &&
                                392         [ +  + ]:         828491 :         (stream->pending_read_nblocks == io_combine_limit ||
                                393         [ +  + ]:         827010 :          (stream->pending_read_nblocks == stream->distance &&
                                394         [ -  + ]:         800882 :           stream->pinned_buffers == 0) ||
                                395         [ +  + ]:          26128 :          stream->distance == 0) &&
                                396         [ +  + ]:         805213 :         stream->ios_in_progress < stream->max_ios)
                                397                 :         805176 :         read_stream_start_pending_read(stream, suppress_advice);
                                398                 :                : }
                                399                 :                : 
                                400                 :                : /*
                                401                 :                :  * Create a new read stream object that can be used to perform the equivalent
                                402                 :                :  * of a series of ReadBuffer() calls for one fork of one relation.
                                403                 :                :  * Internally, it generates larger vectored reads where possible by looking
                                404                 :                :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
                                405                 :                :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
                                406                 :                :  * write extra data for each block into the space provided to it.  It will
                                407                 :                :  * also receive callback_private_data for its own purposes.
                                408                 :                :  */
                                409                 :                : ReadStream *
                                410                 :         314077 : read_stream_begin_relation(int flags,
                                411                 :                :                            BufferAccessStrategy strategy,
                                412                 :                :                            Relation rel,
                                413                 :                :                            ForkNumber forknum,
                                414                 :                :                            ReadStreamBlockNumberCB callback,
                                415                 :                :                            void *callback_private_data,
                                416                 :                :                            size_t per_buffer_data_size)
                                417                 :                : {
                                418                 :                :     ReadStream *stream;
                                419                 :                :     size_t      size;
                                420                 :                :     int16       queue_size;
                                421                 :                :     int16       max_ios;
                                422                 :                :     int         strategy_pin_limit;
                                423                 :                :     uint32      max_pinned_buffers;
                                424                 :                :     Oid         tablespace_id;
                                425                 :                :     SMgrRelation smgr;
                                426                 :                : 
                                427                 :         314077 :     smgr = RelationGetSmgr(rel);
                                428                 :                : 
                                429                 :                :     /*
                                430                 :                :      * Decide how many I/Os we will allow to run at the same time.  That
                                431                 :                :      * currently means advice to the kernel to tell it that we will soon read.
                                432                 :                :      * This number also affects how far we look ahead for opportunities to
                                433                 :                :      * start more I/Os.
                                434                 :                :      */
                                435                 :         314077 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
                                436   [ +  +  +  + ]:         619172 :     if (!OidIsValid(MyDatabaseId) ||
                                437         [ -  + ]:         374642 :         IsCatalogRelation(rel) ||
                                438                 :          69547 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
                                439                 :                :     {
                                440                 :                :         /*
                                441                 :                :          * Avoid circularity while trying to look up tablespace settings or
                                442                 :                :          * before spccache.c is ready.
                                443                 :                :          */
                                444                 :         244530 :         max_ios = effective_io_concurrency;
                                445                 :                :     }
                                446         [ +  + ]:          69547 :     else if (flags & READ_STREAM_MAINTENANCE)
                                447                 :           2782 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
                                448                 :                :     else
                                449                 :          66765 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
                                450                 :         314077 :     max_ios = Min(max_ios, PG_INT16_MAX);
                                451                 :                : 
                                452                 :                :     /*
                                453                 :                :      * Choose the maximum number of buffers we're prepared to pin.  We try to
                                454                 :                :      * pin fewer if we can, though.  We clamp it to at least io_combine_limit
                                455                 :                :      * so that we can have a chance to build up a full io_combine_limit sized
                                456                 :                :      * read, even when max_ios is zero.  Be careful not to allow int16 to
                                457                 :                :      * overflow (even though that's not possible with the current GUC range
                                458                 :                :      * limits), allowing also for the spare entry and the overflow space.
                                459                 :                :      */
                                460                 :         314077 :     max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
                                461                 :         314077 :     max_pinned_buffers = Min(max_pinned_buffers,
                                462                 :                :                              PG_INT16_MAX - io_combine_limit - 1);
                                463                 :                : 
                                464                 :                :     /* Give the strategy a chance to limit the number of buffers we pin. */
    8                           465                 :         314077 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
                                466                 :         314077 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
                                467                 :                : 
                                468                 :                :     /* Don't allow this backend to pin more than its share of buffers. */
   11                           469         [ +  + ]:         314077 :     if (SmgrIsTemp(smgr))
                                470                 :           5344 :         LimitAdditionalLocalPins(&max_pinned_buffers);
                                471                 :                :     else
                                472                 :         308733 :         LimitAdditionalPins(&max_pinned_buffers);
                                473         [ -  + ]:         314077 :     Assert(max_pinned_buffers > 0);
                                474                 :                : 
                                475                 :                :     /*
                                476                 :                :      * We need one extra entry for buffers and per-buffer data, because users
                                477                 :                :      * of per-buffer data have access to the object until the next call to
                                478                 :                :      * read_stream_next_buffer(), so we need a gap between the head and tail
                                479                 :                :      * of the queue so that we don't clobber it.
                                480                 :                :      */
                                481                 :         314077 :     queue_size = max_pinned_buffers + 1;
                                482                 :                : 
                                483                 :                :     /*
                                484                 :                :      * Allocate the object, the buffers, the ios and per_data_data space in
                                485                 :                :      * one big chunk.  Though we have queue_size buffers, we want to be able
                                486                 :                :      * to assume that all the buffers for a single read are contiguous (i.e.
                                487                 :                :      * don't wrap around halfway through), so we allow temporary overflows of
                                488                 :                :      * up to the maximum possible read size by allocating an extra
                                489                 :                :      * io_combine_limit - 1 elements.
                                490                 :                :      */
                                491                 :         314077 :     size = offsetof(ReadStream, buffers);
                                492                 :         314077 :     size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
                                493         [ +  - ]:         314077 :     size += sizeof(InProgressIO) * Max(1, max_ios);
                                494                 :         314077 :     size += per_buffer_data_size * queue_size;
                                495                 :         314077 :     size += MAXIMUM_ALIGNOF * 2;
                                496                 :         314077 :     stream = (ReadStream *) palloc(size);
                                497                 :         314077 :     memset(stream, 0, offsetof(ReadStream, buffers));
                                498                 :         314077 :     stream->ios = (InProgressIO *)
                                499                 :         314077 :         MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
                                500         [ -  + ]:         314077 :     if (per_buffer_data_size > 0)
   11 tmunro@postgresql.or      501                 :UNC           0 :         stream->per_buffer_data = (void *)
                                502         [ #  # ]:              0 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
                                503                 :                : 
                                504                 :                : #ifdef USE_PREFETCH
                                505                 :                : 
                                506                 :                :     /*
                                507                 :                :      * This system supports prefetching advice.  We can use it as long as
                                508                 :                :      * direct I/O isn't enabled, the caller hasn't promised sequential access
                                509                 :                :      * (overriding our detection heuristics), and max_ios hasn't been set to
                                510                 :                :      * zero.
                                511                 :                :      */
   11 tmunro@postgresql.or      512         [ +  + ]:GNC      314077 :     if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
                                513   [ +  +  +  - ]:         313974 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
                                514                 :                :         max_ios > 0)
                                515                 :           9570 :         stream->advice_enabled = true;
                                516                 :                : #endif
                                517                 :                : 
                                518                 :                :     /*
                                519                 :                :      * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
                                520                 :                :      * above.  If we had real asynchronous I/O we might need a slightly
                                521                 :                :      * different definition.
                                522                 :                :      */
                                523         [ -  + ]:         314077 :     if (max_ios == 0)
   11 tmunro@postgresql.or      524                 :UNC           0 :         max_ios = 1;
                                525                 :                : 
   11 tmunro@postgresql.or      526                 :GNC      314077 :     stream->max_ios = max_ios;
                                527                 :         314077 :     stream->per_buffer_data_size = per_buffer_data_size;
                                528                 :         314077 :     stream->max_pinned_buffers = max_pinned_buffers;
                                529                 :         314077 :     stream->queue_size = queue_size;
                                530                 :         314077 :     stream->callback = callback;
                                531                 :         314077 :     stream->callback_private_data = callback_private_data;
                                532                 :                : 
                                533                 :                :     /*
                                534                 :                :      * Skip the initial ramp-up phase if the caller says we're going to be
                                535                 :                :      * reading the whole relation.  This way we start out assuming we'll be
                                536                 :                :      * doing full io_combine_limit sized reads (behavior B).
                                537                 :                :      */
                                538         [ +  + ]:         314077 :     if (flags & READ_STREAM_FULL)
                                539                 :           2751 :         stream->distance = Min(max_pinned_buffers, io_combine_limit);
                                540                 :                :     else
                                541                 :         311326 :         stream->distance = 1;
                                542                 :                : 
                                543                 :                :     /*
                                544                 :                :      * Since we always always access the same relation, we can initialize
                                545                 :                :      * parts of the ReadBuffersOperation objects and leave them that way, to
                                546                 :                :      * avoid wasting CPU cycles writing to them for each read.
                                547                 :                :      */
                                548         [ +  + ]:         705377 :     for (int i = 0; i < max_ios; ++i)
                                549                 :                :     {
                                550                 :         391300 :         stream->ios[i].op.rel = rel;
                                551                 :         391300 :         stream->ios[i].op.smgr = RelationGetSmgr(rel);
                                552                 :         391300 :         stream->ios[i].op.smgr_persistence = 0;
                                553                 :         391300 :         stream->ios[i].op.forknum = forknum;
                                554                 :         391300 :         stream->ios[i].op.strategy = strategy;
                                555                 :                :     }
                                556                 :                : 
                                557                 :         314077 :     return stream;
                                558                 :                : }
                                559                 :                : 
                                560                 :                : /*
                                561                 :                :  * Pull one pinned buffer out of a stream.  Each call returns successive
                                562                 :                :  * blocks in the order specified by the callback.  If per_buffer_data_size was
                                563                 :                :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
                                564                 :                :  * per-buffer data that the callback had a chance to populate, which remains
                                565                 :                :  * valid until the next call to read_stream_next_buffer().  When the stream
                                566                 :                :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
                                567                 :                :  * the stream early at any time by calling read_stream_end().
                                568                 :                :  */
                                569                 :                : Buffer
                                570                 :        3705969 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                                571                 :                : {
                                572                 :                :     Buffer      buffer;
                                573                 :                :     int16       oldest_buffer_index;
                                574                 :                : 
                                575                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                576                 :                : 
                                577                 :                :     /*
                                578                 :                :      * A fast path for all-cached scans (behavior A).  This is the same as the
                                579                 :                :      * usual algorithm, but it is specialized for no I/O and no per-buffer
                                580                 :                :      * data, so we can skip the queue management code, stay in the same buffer
                                581                 :                :      * slot and use singular StartReadBuffer().
                                582                 :                :      */
                                583         [ +  + ]:        3705969 :     if (likely(stream->fast_path))
                                584                 :                :     {
                                585                 :                :         BlockNumber next_blocknum;
                                586                 :                : 
                                587                 :                :         /* Fast path assumptions. */
                                588         [ -  + ]:        1441940 :         Assert(stream->ios_in_progress == 0);
                                589         [ -  + ]:        1441940 :         Assert(stream->pinned_buffers == 1);
                                590         [ -  + ]:        1441940 :         Assert(stream->distance == 1);
    8                           591         [ -  + ]:        1441940 :         Assert(stream->pending_read_nblocks == 0);
   11                           592         [ -  + ]:        1441940 :         Assert(stream->per_buffer_data_size == 0);
                                593                 :                : 
                                594                 :                :         /* We're going to return the buffer we pinned last time. */
                                595                 :        1441940 :         oldest_buffer_index = stream->oldest_buffer_index;
                                596         [ -  + ]:        1441940 :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
                                597                 :                :                stream->next_buffer_index);
                                598                 :        1441940 :         buffer = stream->buffers[oldest_buffer_index];
                                599         [ -  + ]:        1441940 :         Assert(buffer != InvalidBuffer);
                                600                 :                : 
                                601                 :                :         /* Choose the next block to pin. */
                                602         [ +  + ]:        1441940 :         if (unlikely(stream->blocknums_next == stream->blocknums_count))
                                603                 :         179148 :             read_stream_fill_blocknums(stream);
                                604                 :        1441940 :         next_blocknum = stream->blocknums[stream->blocknums_next++];
                                605                 :                : 
    8                           606         [ +  + ]:        1441940 :         if (likely(next_blocknum != InvalidBlockNumber))
                                607                 :                :         {
                                608                 :                :             /*
                                609                 :                :              * Pin a buffer for the next call.  Same buffer entry, and
                                610                 :                :              * arbitrary I/O entry (they're all free).  We don't have to
                                611                 :                :              * adjust pinned_buffers because we're transferring one to caller
                                612                 :                :              * but pinning one more.
                                613                 :                :              */
                                614   [ +  +  +  + ]:        1374720 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
                                615                 :                :                                         &stream->buffers[oldest_buffer_index],
                                616                 :                :                                         next_blocknum,
                                617                 :                :                                         stream->advice_enabled ?
                                618                 :                :                                         READ_BUFFERS_ISSUE_ADVICE : 0)))
                                619                 :                :             {
                                620                 :                :                 /* Fast return. */
                                621                 :        1366469 :                 return buffer;
                                622                 :                :             }
                                623                 :                : 
                                624                 :                :             /* Next call must wait for I/O for the newly pinned buffer. */
   11                           625                 :           8251 :             stream->oldest_io_index = 0;
                                626                 :           8251 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
                                627                 :           8251 :             stream->ios_in_progress = 1;
                                628                 :           8251 :             stream->ios[0].buffer_index = oldest_buffer_index;
                                629                 :           8251 :             stream->seq_blocknum = next_blocknum + 1;
                                630                 :                :         }
                                631                 :                :         else
                                632                 :                :         {
                                633                 :                :             /* No more blocks, end of stream. */
    8                           634                 :          67220 :             stream->distance = 0;
                                635                 :          67220 :             stream->oldest_buffer_index = stream->next_buffer_index;
                                636                 :          67220 :             stream->pinned_buffers = 0;
                                637                 :                :         }
                                638                 :                : 
                                639                 :          75471 :         stream->fast_path = false;
   11                           640                 :          75471 :         return buffer;
                                641                 :                :     }
                                642                 :                : #endif
                                643                 :                : 
                                644         [ +  + ]:        2264029 :     if (unlikely(stream->pinned_buffers == 0))
                                645                 :                :     {
                                646         [ -  + ]:        1915347 :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
                                647                 :                : 
                                648                 :                :         /* End of stream reached?  */
                                649         [ +  + ]:        1915347 :         if (stream->distance == 0)
                                650                 :        1104303 :             return InvalidBuffer;
                                651                 :                : 
                                652                 :                :         /*
                                653                 :                :          * The usual order of operations is that we look ahead at the bottom
                                654                 :                :          * of this function after potentially finishing an I/O and making
                                655                 :                :          * space for more, but if we're just starting up we'll need to crank
                                656                 :                :          * the handle to get started.
                                657                 :                :          */
                                658                 :         811044 :         read_stream_look_ahead(stream, true);
                                659                 :                : 
                                660                 :                :         /* End of stream reached? */
                                661         [ +  + ]:         811044 :         if (stream->pinned_buffers == 0)
                                662                 :                :         {
                                663         [ -  + ]:         383606 :             Assert(stream->distance == 0);
                                664                 :         383606 :             return InvalidBuffer;
                                665                 :                :         }
                                666                 :                :     }
                                667                 :                : 
                                668                 :                :     /* Grab the oldest pinned buffer and associated per-buffer data. */
                                669         [ -  + ]:         776120 :     Assert(stream->pinned_buffers > 0);
                                670                 :         776120 :     oldest_buffer_index = stream->oldest_buffer_index;
                                671   [ +  -  -  + ]:         776120 :     Assert(oldest_buffer_index >= 0 &&
                                672                 :                :            oldest_buffer_index < stream->queue_size);
                                673                 :         776120 :     buffer = stream->buffers[oldest_buffer_index];
                                674         [ -  + ]:         776120 :     if (per_buffer_data)
   11 tmunro@postgresql.or      675                 :UNC           0 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
                                676                 :                : 
   11 tmunro@postgresql.or      677         [ -  + ]:GNC      776120 :     Assert(BufferIsValid(buffer));
                                678                 :                : 
                                679                 :                :     /* Do we have to wait for an associated I/O first? */
                                680         [ +  + ]:         776120 :     if (stream->ios_in_progress > 0 &&
                                681         [ +  + ]:         259753 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
                                682                 :                :     {
                                683                 :         254272 :         int16       io_index = stream->oldest_io_index;
                                684                 :                :         int16       distance;
                                685                 :                : 
                                686                 :                :         /* Sanity check that we still agree on the buffers. */
                                687         [ -  + ]:         254272 :         Assert(stream->ios[io_index].op.buffers ==
                                688                 :                :                &stream->buffers[oldest_buffer_index]);
                                689                 :                : 
                                690                 :         254272 :         WaitReadBuffers(&stream->ios[io_index].op);
                                691                 :                : 
                                692         [ -  + ]:         254272 :         Assert(stream->ios_in_progress > 0);
                                693                 :         254272 :         stream->ios_in_progress--;
                                694         [ +  + ]:         254272 :         if (++stream->oldest_io_index == stream->max_ios)
                                695                 :         240952 :             stream->oldest_io_index = 0;
                                696                 :                : 
                                697         [ +  + ]:         254272 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
                                698                 :                :         {
                                699                 :                :             /* Distance ramps up fast (behavior C). */
                                700                 :             84 :             distance = stream->distance * 2;
                                701                 :             84 :             distance = Min(distance, stream->max_pinned_buffers);
                                702                 :             84 :             stream->distance = distance;
                                703                 :                :         }
                                704                 :                :         else
                                705                 :                :         {
                                706                 :                :             /* No advice; move towards io_combine_limit (behavior B). */
                                707         [ -  + ]:         254188 :             if (stream->distance > io_combine_limit)
                                708                 :                :             {
   11 tmunro@postgresql.or      709                 :UNC           0 :                 stream->distance--;
                                710                 :                :             }
                                711                 :                :             else
                                712                 :                :             {
   11 tmunro@postgresql.or      713                 :GNC      254188 :                 distance = stream->distance * 2;
                                714                 :         254188 :                 distance = Min(distance, io_combine_limit);
                                715                 :         254188 :                 distance = Min(distance, stream->max_pinned_buffers);
                                716                 :         254188 :                 stream->distance = distance;
                                717                 :                :             }
                                718                 :                :         }
                                719                 :                :     }
                                720                 :                : 
                                721                 :                : #ifdef CLOBBER_FREED_MEMORY
                                722                 :                :     /* Clobber old buffer and per-buffer data for debugging purposes. */
                                723                 :         776120 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
                                724                 :                : 
                                725                 :                :     /*
                                726                 :                :      * The caller will get access to the per-buffer data, until the next call.
                                727                 :                :      * We wipe the one before, which is never occupied because queue_size
                                728                 :                :      * allowed one extra element.  This will hopefully trip up client code
                                729                 :                :      * that is holding a dangling pointer to it.
                                730                 :                :      */
                                731         [ -  + ]:         776120 :     if (stream->per_buffer_data)
   11 tmunro@postgresql.or      732         [ #  # ]:UNC           0 :         wipe_mem(get_per_buffer_data(stream,
                                733                 :                :                                      oldest_buffer_index == 0 ?
                                734                 :              0 :                                      stream->queue_size - 1 :
                                735                 :              0 :                                      oldest_buffer_index - 1),
                                736                 :                :                  stream->per_buffer_data_size);
                                737                 :                : #endif
                                738                 :                : 
                                739                 :                :     /* Pin transferred to caller. */
   11 tmunro@postgresql.or      740         [ -  + ]:GNC      776120 :     Assert(stream->pinned_buffers > 0);
                                741                 :         776120 :     stream->pinned_buffers--;
                                742                 :                : 
                                743                 :                :     /* Advance oldest buffer, with wrap-around. */
                                744                 :         776120 :     stream->oldest_buffer_index++;
                                745         [ +  + ]:         776120 :     if (stream->oldest_buffer_index == stream->queue_size)
                                746                 :         141420 :         stream->oldest_buffer_index = 0;
                                747                 :                : 
                                748                 :                :     /* Prepare for the next call. */
                                749                 :         776120 :     read_stream_look_ahead(stream, false);
                                750                 :                : 
                                751                 :                : #ifndef READ_STREAM_DISABLE_FAST_PATH
                                752                 :                :     /* See if we can take the fast path for all-cached scans next time. */
                                753         [ +  + ]:         776120 :     if (stream->ios_in_progress == 0 &&
                                754         [ +  + ]:         545989 :         stream->pinned_buffers == 1 &&
                                755         [ +  + ]:         158826 :         stream->distance == 1 &&
    8                           756         [ +  + ]:         151582 :         stream->pending_read_nblocks == 0 &&
   11                           757         [ +  - ]:         151432 :         stream->per_buffer_data_size == 0)
                                758                 :                :     {
                                759                 :         151432 :         stream->fast_path = true;
                                760                 :                :     }
                                761                 :                : #endif
                                762                 :                : 
                                763                 :         776120 :     return buffer;
                                764                 :                : }
                                765                 :                : 
                                766                 :                : /*
                                767                 :                :  * Reset a read stream by releasing any queued up buffers, allowing the stream
                                768                 :                :  * to be used again for different blocks.  This can be used to clear an
                                769                 :                :  * end-of-stream condition and start again, or to throw away blocks that were
                                770                 :                :  * speculatively read and read some different blocks instead.
                                771                 :                :  */
                                772                 :                : void
                                773                 :         810476 : read_stream_reset(ReadStream *stream)
                                774                 :                : {
                                775                 :                :     Buffer      buffer;
                                776                 :                : 
                                777                 :                :     /* Stop looking ahead. */
                                778                 :         810476 :     stream->distance = 0;
                                779                 :                : 
                                780                 :                :     /* Forget buffered block numbers and fast path state. */
    8                           781                 :         810476 :     stream->blocknums_next = 0;
                                782                 :         810476 :     stream->blocknums_count = 0;
                                783                 :         810476 :     stream->fast_path = false;
                                784                 :                : 
                                785                 :                :     /* Unpin anything that wasn't consumed. */
   11                           786         [ +  + ]:         901220 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
                                787                 :          90744 :         ReleaseBuffer(buffer);
                                788                 :                : 
                                789         [ -  + ]:         810476 :     Assert(stream->pinned_buffers == 0);
                                790         [ -  + ]:         810476 :     Assert(stream->ios_in_progress == 0);
                                791                 :                : 
                                792                 :                :     /* Start off assuming data is cached. */
                                793                 :         810476 :     stream->distance = 1;
                                794                 :         810476 : }
                                795                 :                : 
                                796                 :                : /*
                                797                 :                :  * Release and free a read stream.
                                798                 :                :  */
                                799                 :                : void
                                800                 :         312912 : read_stream_end(ReadStream *stream)
                                801                 :                : {
                                802                 :         312912 :     read_stream_reset(stream);
                                803                 :         312912 :     pfree(stream);
                                804                 :         312912 : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622