LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - xlogprefetcher.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 87.7 % 292 256 1 1 11 23 69 19 168 13 84 3
Current Date: 2023-04-08 15:15:32 Functions: 91.7 % 24 22 1 1 5 3 14 1 7
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * xlogprefetcher.c
       4                 :  *      Prefetching support for recovery.
       5                 :  *
       6                 :  * Portions Copyright (c) 2022-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *      src/backend/access/transam/xlogprefetcher.c
      12                 :  *
      13                 :  * This module provides a drop-in replacement for an XLogReader that tries to
      14                 :  * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
      15                 :  * accessed in the near future are not already in the buffer pool, it initiates
      16                 :  * I/Os that might complete before the caller eventually needs the data.  When
      17                 :  * referenced blocks are found in the buffer pool already, the buffer is
      18                 :  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
      19                 :  * avoid a second buffer mapping table lookup.
      20                 :  *
      21                 :  * Currently, only the main fork is considered for prefetching.  Currently,
      22                 :  * prefetching is only effective on systems where PrefetchBuffer() does
      23                 :  * something useful (mainly Linux).
      24                 :  *
      25                 :  *-------------------------------------------------------------------------
      26                 :  */
      27                 : 
      28                 : #include "postgres.h"
      29                 : 
      30                 : #include "access/xlog.h"
      31                 : #include "access/xlogprefetcher.h"
      32                 : #include "access/xlogreader.h"
      33                 : #include "access/xlogutils.h"
      34                 : #include "catalog/pg_class.h"
      35                 : #include "catalog/pg_control.h"
      36                 : #include "catalog/storage_xlog.h"
      37                 : #include "commands/dbcommands_xlog.h"
      38                 : #include "utils/fmgrprotos.h"
      39                 : #include "utils/timestamp.h"
      40                 : #include "funcapi.h"
      41                 : #include "pgstat.h"
      42                 : #include "miscadmin.h"
      43                 : #include "port/atomics.h"
      44                 : #include "storage/bufmgr.h"
      45                 : #include "storage/shmem.h"
      46                 : #include "storage/smgr.h"
      47                 : #include "utils/guc_hooks.h"
      48                 : #include "utils/hsearch.h"
      49                 : 
      50                 : /*
      51                 :  * Every time we process this much WAL, we'll update the values in
      52                 :  * pg_stat_recovery_prefetch.
      53                 :  */
      54                 : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
      55                 : 
      56                 : /*
      57                 :  * To detect repeated access to the same block and skip useless extra system
      58                 :  * calls, we remember a small window of recently prefetched blocks.
      59                 :  */
      60                 : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
      61                 : 
      62                 : /*
      63                 :  * When maintenance_io_concurrency is not saturated, we're prepared to look
      64                 :  * ahead up to N times that number of block references.
      65                 :  */
      66                 : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
      67                 : 
      68                 : /* Define to log internal debugging messages. */
      69                 : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
      70                 : 
      71                 : /* GUCs */
      72                 : int         recovery_prefetch = RECOVERY_PREFETCH_TRY;
      73                 : 
      74                 : #ifdef USE_PREFETCH
      75                 : #define RecoveryPrefetchEnabled() \
      76                 :         (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
      77                 :          maintenance_io_concurrency > 0)
      78                 : #else
      79                 : #define RecoveryPrefetchEnabled() false
      80                 : #endif
      81                 : 
      82                 : static int  XLogPrefetchReconfigureCount = 0;
      83                 : 
      84                 : /*
      85                 :  * Enum used to report whether an IO should be started.
      86                 :  */
      87                 : typedef enum
      88                 : {
      89                 :     LRQ_NEXT_NO_IO,
      90                 :     LRQ_NEXT_IO,
      91                 :     LRQ_NEXT_AGAIN
      92                 : } LsnReadQueueNextStatus;
      93                 : 
      94                 : /*
      95                 :  * Type of callback that can decide which block to prefetch next.  For now
      96                 :  * there is only one.
      97                 :  */
      98                 : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
      99                 :                                                        XLogRecPtr *lsn);
     100                 : 
     101                 : /*
     102                 :  * A simple circular queue of LSNs, using to control the number of
     103                 :  * (potentially) inflight IOs.  This stands in for a later more general IO
     104                 :  * control mechanism, which is why it has the apparently unnecessary
     105                 :  * indirection through a function pointer.
     106                 :  */
     107                 : typedef struct LsnReadQueue
     108                 : {
     109                 :     LsnReadQueueNextFun next;
     110                 :     uintptr_t   lrq_private;
     111                 :     uint32      max_inflight;
     112                 :     uint32      inflight;
     113                 :     uint32      completed;
     114                 :     uint32      head;
     115                 :     uint32      tail;
     116                 :     uint32      size;
     117                 :     struct
     118                 :     {
     119                 :         bool        io;
     120                 :         XLogRecPtr  lsn;
     121                 :     }           queue[FLEXIBLE_ARRAY_MEMBER];
     122                 : } LsnReadQueue;
     123                 : 
     124                 : /*
     125                 :  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
     126                 :  * blocks that will be soon be referenced, to try to avoid IO stalls.
     127                 :  */
     128                 : struct XLogPrefetcher
     129                 : {
     130                 :     /* WAL reader and current reading state. */
     131                 :     XLogReaderState *reader;
     132                 :     DecodedXLogRecord *record;
     133                 :     int         next_block_id;
     134                 : 
     135                 :     /* When to publish stats. */
     136                 :     XLogRecPtr  next_stats_shm_lsn;
     137                 : 
     138                 :     /* Book-keeping to avoid accessing blocks that don't exist yet. */
     139                 :     HTAB       *filter_table;
     140                 :     dlist_head  filter_queue;
     141                 : 
     142                 :     /* Book-keeping to avoid repeat prefetches. */
     143                 :     RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     144                 :     BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     145                 :     int         recent_idx;
     146                 : 
     147                 :     /* Book-keeping to disable prefetching temporarily. */
     148                 :     XLogRecPtr  no_readahead_until;
     149                 : 
     150                 :     /* IO depth manager. */
     151                 :     LsnReadQueue *streaming_read;
     152                 : 
     153                 :     XLogRecPtr  begin_ptr;
     154                 : 
     155                 :     int         reconfigure_count;
     156                 : };
     157                 : 
     158                 : /*
     159                 :  * A temporary filter used to track block ranges that haven't been created
     160                 :  * yet, whole relations that haven't been created yet, and whole relations
     161                 :  * that (we assume) have already been dropped, or will be created by bulk WAL
     162                 :  * operators.
     163                 :  */
     164                 : typedef struct XLogPrefetcherFilter
     165                 : {
     166                 :     RelFileLocator rlocator;
     167                 :     XLogRecPtr  filter_until_replayed;
     168                 :     BlockNumber filter_from_block;
     169                 :     dlist_node  link;
     170                 : } XLogPrefetcherFilter;
     171                 : 
     172                 : /*
     173                 :  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
     174                 :  */
     175                 : typedef struct XLogPrefetchStats
     176                 : {
     177                 :     pg_atomic_uint64 reset_time;    /* Time of last reset. */
     178                 :     pg_atomic_uint64 prefetch;  /* Prefetches initiated. */
     179                 :     pg_atomic_uint64 hit;       /* Blocks already in cache. */
     180                 :     pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
     181                 :     pg_atomic_uint64 skip_new;  /* New/missing blocks filtered. */
     182                 :     pg_atomic_uint64 skip_fpw;  /* FPWs skipped. */
     183                 :     pg_atomic_uint64 skip_rep;  /* Repeat accesses skipped. */
     184                 : 
     185                 :     /* Dynamic values */
     186                 :     int         wal_distance;   /* Number of WAL bytes ahead. */
     187                 :     int         block_distance; /* Number of block references ahead. */
     188                 :     int         io_depth;       /* Number of I/Os in progress. */
     189                 : } XLogPrefetchStats;
     190                 : 
     191                 : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
     192                 :                                            RelFileLocator rlocator,
     193                 :                                            BlockNumber blockno,
     194                 :                                            XLogRecPtr lsn);
     195                 : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
     196                 :                                             RelFileLocator rlocator,
     197                 :                                             BlockNumber blockno);
     198                 : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
     199                 :                                                  XLogRecPtr replaying_lsn);
     200                 : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
     201                 :                                                       XLogRecPtr *lsn);
     202                 : 
     203                 : static XLogPrefetchStats *SharedStats;
     204                 : 
     205                 : static inline LsnReadQueue *
     206 CBC        2457 : lrq_alloc(uint32 max_distance,
     207                 :           uint32 max_inflight,
     208                 :           uintptr_t lrq_private,
     209                 :           LsnReadQueueNextFun next)
     210                 : {
     211                 :     LsnReadQueue *lrq;
     212                 :     uint32      size;
     213                 : 
     214            2457 :     Assert(max_distance >= max_inflight);
     215                 : 
     216            2457 :     size = max_distance + 1;    /* full ring buffer has a gap */
     217            2457 :     lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
     218            2457 :     lrq->lrq_private = lrq_private;
     219            2457 :     lrq->max_inflight = max_inflight;
     220            2457 :     lrq->size = size;
     221            2457 :     lrq->next = next;
     222            2457 :     lrq->head = 0;
     223            2457 :     lrq->tail = 0;
     224            2457 :     lrq->inflight = 0;
     225            2457 :     lrq->completed = 0;
     226                 : 
     227            2457 :     return lrq;
     228                 : }
     229                 : 
     230                 : static inline void
     231            2423 : lrq_free(LsnReadQueue *lrq)
     232                 : {
     233            2423 :     pfree(lrq);
     234            2423 : }
     235                 : 
     236                 : static inline uint32
     237           57369 : lrq_inflight(LsnReadQueue *lrq)
     238                 : {
     239           57369 :     return lrq->inflight;
     240                 : }
     241                 : 
     242                 : static inline uint32
     243           57369 : lrq_completed(LsnReadQueue *lrq)
     244                 : {
     245           57369 :     return lrq->completed;
     246                 : }
     247                 : 
     248                 : static inline void
     249         2506697 : lrq_prefetch(LsnReadQueue *lrq)
     250                 : {
     251                 :     /* Try to start as many IOs as we can within our limits. */
     252         7668089 :     while (lrq->inflight < lrq->max_inflight &&
     253         5149006 :            lrq->inflight + lrq->completed < lrq->size - 1)
     254                 :     {
     255         2997153 :         Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
     256         2997153 :         switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
     257                 :         {
     258          342427 :             case LRQ_NEXT_AGAIN:
     259          342427 :                 return;
     260           40647 :             case LRQ_NEXT_IO:
     261           40647 :                 lrq->queue[lrq->head].io = true;
     262           40647 :                 lrq->inflight++;
     263           40647 :                 break;
     264         2614048 :             case LRQ_NEXT_NO_IO:
     265         2614048 :                 lrq->queue[lrq->head].io = false;
     266         2614048 :                 lrq->completed++;
     267         2614048 :                 break;
     268                 :         }
     269         2654695 :         lrq->head++;
     270         2654695 :         if (lrq->head == lrq->size)
     271           64701 :             lrq->head = 0;
     272                 :     }
     273                 : }
     274                 : 
     275                 : static inline void
     276         2506683 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
     277                 : {
     278                 :     /*
     279                 :      * We know that LSNs before 'lsn' have been replayed, so we can now assume
     280                 :      * that any IOs that were started before then have finished.
     281                 :      */
     282         7668057 :     while (lrq->tail != lrq->head &&
     283         5119675 :            lrq->queue[lrq->tail].lsn < lsn)
     284                 :     {
     285         2654691 :         if (lrq->queue[lrq->tail].io)
     286           40647 :             lrq->inflight--;
     287                 :         else
     288         2614044 :             lrq->completed--;
     289         2654691 :         lrq->tail++;
     290         2654691 :         if (lrq->tail == lrq->size)
     291           64701 :             lrq->tail = 0;
     292                 :     }
     293         2506683 :     if (RecoveryPrefetchEnabled())
     294         2506683 :         lrq_prefetch(lrq);
     295         2506652 : }
     296                 : 
     297                 : size_t
     298            2738 : XLogPrefetchShmemSize(void)
     299                 : {
     300            2738 :     return sizeof(XLogPrefetchStats);
     301                 : }
     302                 : 
     303                 : /*
     304                 :  * Reset all counters to zero.
     305                 :  */
     306                 : void
     307 UBC           0 : XLogPrefetchResetStats(void)
     308                 : {
     309               0 :     pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     310               0 :     pg_atomic_write_u64(&SharedStats->prefetch, 0);
     311               0 :     pg_atomic_write_u64(&SharedStats->hit, 0);
     312               0 :     pg_atomic_write_u64(&SharedStats->skip_init, 0);
     313               0 :     pg_atomic_write_u64(&SharedStats->skip_new, 0);
     314               0 :     pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
     315               0 :     pg_atomic_write_u64(&SharedStats->skip_rep, 0);
     316               0 : }
     317                 : 
     318                 : void
     319 CBC        1826 : XLogPrefetchShmemInit(void)
     320                 : {
     321                 :     bool        found;
     322                 : 
     323            1826 :     SharedStats = (XLogPrefetchStats *)
     324            1826 :         ShmemInitStruct("XLogPrefetchStats",
     325                 :                         sizeof(XLogPrefetchStats),
     326                 :                         &found);
     327                 : 
     328            1826 :     if (!found)
     329                 :     {
     330            1826 :         pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     331            1826 :         pg_atomic_init_u64(&SharedStats->prefetch, 0);
     332            1826 :         pg_atomic_init_u64(&SharedStats->hit, 0);
     333            1826 :         pg_atomic_init_u64(&SharedStats->skip_init, 0);
     334            1826 :         pg_atomic_init_u64(&SharedStats->skip_new, 0);
     335            1826 :         pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
     336            1826 :         pg_atomic_init_u64(&SharedStats->skip_rep, 0);
     337                 :     }
     338            1826 : }
     339                 : 
     340                 : /*
     341                 :  * Called when any GUC is changed that affects prefetching.
     342                 :  */
     343                 : void
     344               9 : XLogPrefetchReconfigure(void)
     345                 : {
     346               9 :     XLogPrefetchReconfigureCount++;
     347               9 : }
     348                 : 
     349                 : /*
     350                 :  * Increment a counter in shared memory.  This is equivalent to *counter++ on a
     351                 :  * plain uint64 without any memory barrier or locking, except on platforms
     352                 :  * where readers can't read uint64 without possibly observing a torn value.
     353                 :  */
     354                 : static inline void
     355         2647357 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
     356                 : {
     357         2647357 :     Assert(AmStartupProcess() || !IsUnderPostmaster);
     358         2647357 :     pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
     359         2647357 : }
     360                 : 
     361                 : /*
     362                 :  * Create a prefetcher that is ready to begin prefetching blocks referenced by
     363                 :  * WAL records.
     364                 :  */
     365                 : XLogPrefetcher *
     366            1176 : XLogPrefetcherAllocate(XLogReaderState *reader)
     367                 : {
     368                 :     XLogPrefetcher *prefetcher;
     369                 :     static HASHCTL hash_table_ctl = {
     370                 :         .keysize = sizeof(RelFileLocator),
     371                 :         .entrysize = sizeof(XLogPrefetcherFilter)
     372                 :     };
     373                 : 
     374            1176 :     prefetcher = palloc0(sizeof(XLogPrefetcher));
     375                 : 
     376            1176 :     prefetcher->reader = reader;
     377            1176 :     prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
     378                 :                                            &hash_table_ctl,
     379                 :                                            HASH_ELEM | HASH_BLOBS);
     380            1176 :     dlist_init(&prefetcher->filter_queue);
     381                 : 
     382            1176 :     SharedStats->wal_distance = 0;
     383            1176 :     SharedStats->block_distance = 0;
     384            1176 :     SharedStats->io_depth = 0;
     385                 : 
     386                 :     /* First usage will cause streaming_read to be allocated. */
     387            1176 :     prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
     388                 : 
     389            1176 :     return prefetcher;
     390                 : }
     391                 : 
     392                 : /*
     393                 :  * Destroy a prefetcher and release all resources.
     394                 :  */
     395                 : void
     396            1142 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
     397                 : {
     398            1142 :     lrq_free(prefetcher->streaming_read);
     399            1142 :     hash_destroy(prefetcher->filter_table);
     400            1142 :     pfree(prefetcher);
     401            1142 : }
     402                 : 
     403                 : /*
     404                 :  * Provide access to the reader.
     405                 :  */
     406                 : XLogReaderState *
     407         2506604 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
     408                 : {
     409         2506604 :     return prefetcher->reader;
     410                 : }
     411                 : 
     412                 : /*
     413                 :  * Update the statistics visible in the pg_stat_recovery_prefetch view.
     414                 :  */
     415                 : void
     416           57355 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
     417                 : {
     418                 :     uint32      io_depth;
     419                 :     uint32      completed;
     420                 :     int64       wal_distance;
     421                 : 
     422                 : 
     423                 :     /* How far ahead of replay are we now? */
     424           57355 :     if (prefetcher->reader->decode_queue_tail)
     425                 :     {
     426           44411 :         wal_distance =
     427           44411 :             prefetcher->reader->decode_queue_tail->lsn -
     428           44411 :             prefetcher->reader->decode_queue_head->lsn;
     429                 :     }
     430                 :     else
     431                 :     {
     432           12944 :         wal_distance = 0;
     433                 :     }
     434                 : 
     435                 :     /* How many IOs are currently in flight and completed? */
     436           57355 :     io_depth = lrq_inflight(prefetcher->streaming_read);
     437           57355 :     completed = lrq_completed(prefetcher->streaming_read);
     438                 : 
     439                 :     /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
     440           57355 :     SharedStats->io_depth = io_depth;
     441           57355 :     SharedStats->block_distance = io_depth + completed;
     442           57355 :     SharedStats->wal_distance = wal_distance;
     443                 : 
     444           57355 :     prefetcher->next_stats_shm_lsn =
     445           57355 :         prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
     446           57355 : }
     447                 : 
     448                 : /*
     449                 :  * A callback that examines the next block reference in the WAL, and possibly
     450                 :  * starts an IO so that a later read will be fast.
     451                 :  *
     452                 :  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
     453                 :  *
     454                 :  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
     455                 :  * that isn't in the buffer pool, and the kernel has been asked to start
     456                 :  * reading it to make a future read system call faster. An LSN is written to
     457                 :  * *lsn, and the I/O will be considered to have completed once that LSN is
     458                 :  * replayed.
     459                 :  *
     460                 :  * Returns LRQ_NO_IO if we examined the next block reference and found that it
     461                 :  * was already in the buffer pool, or we decided for various reasons not to
     462                 :  * prefetch.
     463                 :  */
     464                 : static LsnReadQueueNextStatus
     465         2997153 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
     466                 : {
     467         2997153 :     XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
     468         2997153 :     XLogReaderState *reader = prefetcher->reader;
     469         2997153 :     XLogRecPtr  replaying_lsn = reader->ReadRecPtr;
     470                 : 
     471                 :     /*
     472                 :      * We keep track of the record and block we're up to between calls with
     473                 :      * prefetcher->record and prefetcher->next_block_id.
     474                 :      */
     475                 :     for (;;)
     476         2504015 :     {
     477                 :         DecodedXLogRecord *record;
     478                 : 
     479                 :         /* Try to read a new future record, if we don't already have one. */
     480         5501168 :         if (prefetcher->record == NULL)
     481                 :         {
     482                 :             bool        nonblocking;
     483                 : 
     484                 :             /*
     485                 :              * If there are already records or an error queued up that could
     486                 :              * be replayed, we don't want to block here.  Otherwise, it's OK
     487                 :              * to block waiting for more data: presumably the caller has
     488                 :              * nothing else to do.
     489                 :              */
     490         2846473 :             nonblocking = XLogReaderHasQueuedRecordOrError(reader);
     491                 : 
     492                 :             /* Readahead is disabled until we replay past a certain point. */
     493         2846473 :             if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
     494          326360 :                 return LRQ_NEXT_AGAIN;
     495                 : 
     496         2520113 :             record = XLogReadAhead(prefetcher->reader, nonblocking);
     497         2520082 :             if (record == NULL)
     498                 :             {
     499                 :                 /*
     500                 :                  * We can't read any more, due to an error or lack of data in
     501                 :                  * nonblocking mode.  Don't try to read ahead again until
     502                 :                  * we've replayed everything already decoded.
     503                 :                  */
     504           13619 :                 if (nonblocking && prefetcher->reader->decode_queue_tail)
     505           13484 :                     prefetcher->no_readahead_until =
     506           13484 :                         prefetcher->reader->decode_queue_tail->lsn;
     507                 : 
     508           13619 :                 return LRQ_NEXT_AGAIN;
     509                 :             }
     510                 : 
     511                 :             /*
     512                 :              * If prefetching is disabled, we don't need to analyze the record
     513                 :              * or issue any prefetches.  We just need to cause one record to
     514                 :              * be decoded.
     515                 :              */
     516         2506463 :             if (!RecoveryPrefetchEnabled())
     517                 :             {
     518 UBC           0 :                 *lsn = InvalidXLogRecPtr;
     519               0 :                 return LRQ_NEXT_NO_IO;
     520                 :             }
     521                 : 
     522                 :             /* We have a new record to process. */
     523 CBC     2506463 :             prefetcher->record = record;
     524         2506463 :             prefetcher->next_block_id = 0;
     525                 :         }
     526                 :         else
     527                 :         {
     528                 :             /* Continue to process from last call, or last loop. */
     529         2654695 :             record = prefetcher->record;
     530                 :         }
     531                 : 
     532                 :         /*
     533                 :          * Check for operations that require us to filter out block ranges, or
     534                 :          * pause readahead completely.
     535                 :          */
     536         5161158 :         if (replaying_lsn < record->lsn)
     537                 :         {
     538         5161158 :             uint8       rmid = record->header.xl_rmid;
     539         5161158 :             uint8       record_type = record->header.xl_info & ~XLR_INFO_MASK;
     540                 : 
     541         5161158 :             if (rmid == RM_XLOG_ID)
     542                 :             {
     543           60464 :                 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
     544                 :                     record_type == XLOG_END_OF_RECOVERY)
     545                 :                 {
     546                 :                     /*
     547                 :                      * These records might change the TLI.  Avoid potential
     548                 :                      * bugs if we were to allow "read TLI" and "replay TLI" to
     549                 :                      * differ without more analysis.
     550                 :                      */
     551            2168 :                     prefetcher->no_readahead_until = record->lsn;
     552                 : 
     553                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     554                 :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     555                 :                          "suppressing all readahead until %X/%X is replayed due to possible TLI change",
     556                 :                          LSN_FORMAT_ARGS(record->lsn));
     557                 : #endif
     558                 : 
     559                 :                     /* Fall through so we move past this record. */
     560                 :                 }
     561                 :             }
     562         5100694 :             else if (rmid == RM_DBASE_ID)
     563                 :             {
     564                 :                 /*
     565                 :                  * When databases are created with the file-copy strategy,
     566                 :                  * there are no WAL records to tell us about the creation of
     567                 :                  * individual relations.
     568                 :                  */
     569              30 :                 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
     570                 :                 {
     571               3 :                     xl_dbase_create_file_copy_rec *xlrec =
     572                 :                     (xl_dbase_create_file_copy_rec *) record->main_data;
     573 GNC           3 :                     RelFileLocator rlocator =
     574               3 :                     {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
     575 ECB             : 
     576                 :                     /*
     577                 :                      * Don't try to prefetch anything in this database until
     578                 :                      * it has been created, or we might confuse the blocks of
     579                 :                      * different generations, if a database OID or
     580                 :                      * relfilenumber is reused.  It's also more efficient than
     581                 :                      * discovering that relations don't exist on disk yet with
     582                 :                      * ENOENT errors.
     583                 :                      */
     584 GNC           3 :                     XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
     585 ECB             : 
     586                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     587                 :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     588                 :                          "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
     589                 :                          rlocator.dbOid,
     590                 :                          LSN_FORMAT_ARGS(record->lsn));
     591                 : #endif
     592                 :                 }
     593                 :             }
     594 GIC     5100664 :             else if (rmid == RM_SMGR_ID)
     595 ECB             :             {
     596 GIC       12961 :                 if (record_type == XLOG_SMGR_CREATE)
     597 ECB             :                 {
     598 GIC       12919 :                     xl_smgr_create *xlrec = (xl_smgr_create *)
     599 ECB             :                     record->main_data;
     600                 : 
     601 GIC       12919 :                     if (xlrec->forkNum == MAIN_FORKNUM)
     602 ECB             :                     {
     603                 :                         /*
     604                 :                          * Don't prefetch anything for this whole relation
     605                 :                          * until it has been created.  Otherwise we might
     606                 :                          * confuse the blocks of different generations, if a
     607                 :                          * relfilenumber is reused.  This also avoids the need
     608                 :                          * to discover the problem via extra syscalls that
     609                 :                          * report ENOENT.
     610                 :                          */
     611 GNC       11538 :                         XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
     612 ECB             :                                                 record->lsn);
     613                 : 
     614                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     615                 :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     616                 :                              "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
     617                 :                              xlrec->rlocator.spcOid,
     618                 :                              xlrec->rlocator.dbOid,
     619                 :                              xlrec->rlocator.relNumber,
     620                 :                              LSN_FORMAT_ARGS(record->lsn));
     621                 : #endif
     622                 :                     }
     623                 :                 }
     624 GIC          42 :                 else if (record_type == XLOG_SMGR_TRUNCATE)
     625 ECB             :                 {
     626 GIC          42 :                     xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
     627 ECB             :                     record->main_data;
     628                 : 
     629                 :                     /*
     630                 :                      * Don't consider prefetching anything in the truncated
     631                 :                      * range until the truncation has been performed.
     632                 :                      */
     633 GNC          42 :                     XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
     634 ECB             :                                             xlrec->blkno,
     635                 :                                             record->lsn);
     636                 : 
     637                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     638                 :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     639                 :                          "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
     640                 :                          xlrec->rlocator.spcOid,
     641                 :                          xlrec->rlocator.dbOid,
     642                 :                          xlrec->rlocator.relNumber,
     643                 :                          xlrec->blkno,
     644                 :                          LSN_FORMAT_ARGS(record->lsn));
     645                 : #endif
     646                 :                 }
     647                 :             }
     648                 :         }
     649                 : 
     650                 :         /* Scan the block references, starting where we left off last time. */
     651 GIC     5163299 :         while (prefetcher->next_block_id <= record->max_block_id)
     652 ECB             :         {
     653 GIC     2656836 :             int         block_id = prefetcher->next_block_id++;
     654 CBC     2656836 :             DecodedBkpBlock *block = &record->blocks[block_id];
     655 ECB             :             SMgrRelation reln;
     656                 :             PrefetchBufferResult result;
     657                 : 
     658 GIC     2656836 :             if (!block->in_use)
     659 CBC        1983 :                 continue;
     660 ECB             : 
     661 GNC     2654853 :             Assert(!BufferIsValid(block->prefetch_buffer));
     662 ECB             : 
     663                 :             /*
     664                 :              * Record the LSN of this record.  When it's replayed,
     665                 :              * LsnReadQueue will consider any IOs submitted for earlier LSNs
     666                 :              * to be finished.
     667                 :              */
     668 GIC     2654853 :             *lsn = record->lsn;
     669 ECB             : 
     670                 :             /* We don't try to prefetch anything but the main fork for now. */
     671 GIC     2654853 :             if (block->forknum != MAIN_FORKNUM)
     672 ECB             :             {
     673 GIC     2654695 :                 return LRQ_NEXT_NO_IO;
     674 ECB             :             }
     675                 : 
     676                 :             /*
     677                 :              * If there is a full page image attached, we won't be reading the
     678                 :              * page, so don't bother trying to prefetch.
     679                 :              */
     680 GIC     2647515 :             if (block->has_image)
     681 ECB             :             {
     682 GIC       32632 :                 XLogPrefetchIncrement(&SharedStats->skip_fpw);
     683 CBC       32632 :                 return LRQ_NEXT_NO_IO;
     684 ECB             :             }
     685                 : 
     686                 :             /* There is no point in reading a page that will be zeroed. */
     687 GIC     2614883 :             if (block->flags & BKPBLOCK_WILL_INIT)
     688 ECB             :             {
     689 GIC       50233 :                 XLogPrefetchIncrement(&SharedStats->skip_init);
     690 CBC       50233 :                 return LRQ_NEXT_NO_IO;
     691 ECB             :             }
     692                 : 
     693                 :             /* Should we skip prefetching this block due to a filter? */
     694 GNC     2564650 :             if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
     695 ECB             :             {
     696 GIC      315982 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     697 CBC      315982 :                 return LRQ_NEXT_NO_IO;
     698 ECB             :             }
     699                 : 
     700                 :             /* There is no point in repeatedly prefetching the same block. */
     701 GIC     6806864 :             for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
     702 ECB             :             {
     703 GIC     6329456 :                 if (block->blkno == prefetcher->recent_block[i] &&
     704 GNC     1875837 :                     RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
     705 ECB             :                 {
     706                 :                     /*
     707                 :                      * XXX If we also remembered where it was, we could set
     708                 :                      * recent_buffer so that recovery could skip smgropen()
     709                 :                      * and a buffer table lookup.
     710                 :                      */
     711 GIC     1771260 :                     XLogPrefetchIncrement(&SharedStats->skip_rep);
     712 CBC     1771260 :                     return LRQ_NEXT_NO_IO;
     713 ECB             :                 }
     714                 :             }
     715 GNC      477408 :             prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
     716 CBC      477408 :             prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
     717          477408 :             prefetcher->recent_idx =
     718          477408 :                 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
     719 ECB             : 
     720                 :             /*
     721                 :              * We could try to have a fast path for repeated references to the
     722                 :              * same relation (with some scheme to handle invalidations
     723                 :              * safely), but for now we'll call smgropen() every time.
     724                 :              */
     725 GNC      477408 :             reln = smgropen(block->rlocator, InvalidBackendId);
     726 ECB             : 
     727                 :             /*
     728                 :              * If the relation file doesn't exist on disk, for example because
     729                 :              * we're replaying after a crash and the file will be created and
     730                 :              * then unlinked by WAL that hasn't been replayed yet, suppress
     731                 :              * further prefetching in the relation until this record is
     732                 :              * replayed.
     733                 :              */
     734 GIC      477408 :             if (!smgrexists(reln, MAIN_FORKNUM))
     735 ECB             :             {
     736                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     737                 :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     738                 :                      "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
     739                 :                      reln->smgr_rlocator.locator.spcOid,
     740                 :                      reln->smgr_rlocator.locator.dbOid,
     741                 :                      reln->smgr_rlocator.locator.relNumber,
     742                 :                      LSN_FORMAT_ARGS(record->lsn));
     743                 : #endif
     744 UNC           0 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
     745 EUB             :                                         record->lsn);
     746 UIC           0 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     747 UBC           0 :                 return LRQ_NEXT_NO_IO;
     748 EUB             :             }
     749                 : 
     750                 :             /*
     751                 :              * If the relation isn't big enough to contain the referenced
     752                 :              * block yet, suppress prefetching of this block and higher until
     753                 :              * this record is replayed.
     754                 :              */
     755 GIC      477408 :             if (block->blkno >= smgrnblocks(reln, block->forknum))
     756 ECB             :             {
     757                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     758                 :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     759                 :                      "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
     760                 :                      reln->smgr_rlocator.locator.spcOid,
     761                 :                      reln->smgr_rlocator.locator.dbOid,
     762                 :                      reln->smgr_rlocator.locator.relNumber,
     763                 :                      block->blkno,
     764                 :                      LSN_FORMAT_ARGS(record->lsn));
     765                 : #endif
     766 GNC       12988 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
     767 ECB             :                                         record->lsn);
     768 GIC       12988 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     769 CBC       12988 :                 return LRQ_NEXT_NO_IO;
     770 ECB             :             }
     771                 : 
     772                 :             /* Try to initiate prefetching. */
     773 GIC      464420 :             result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
     774 CBC      464420 :             if (BufferIsValid(result.recent_buffer))
     775 ECB             :             {
     776                 :                 /* Cache hit, nothing to do. */
     777 GIC      423615 :                 XLogPrefetchIncrement(&SharedStats->hit);
     778 CBC      423615 :                 block->prefetch_buffer = result.recent_buffer;
     779          423615 :                 return LRQ_NEXT_NO_IO;
     780 ECB             :             }
     781 GIC       40805 :             else if (result.initiated_io)
     782 ECB             :             {
     783                 :                 /* Cache miss, I/O (presumably) started. */
     784 GIC       40647 :                 XLogPrefetchIncrement(&SharedStats->prefetch);
     785 CBC       40647 :                 block->prefetch_buffer = InvalidBuffer;
     786           40647 :                 return LRQ_NEXT_IO;
     787 ECB             :             }
     788 GNC         158 :             else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
     789 ECB             :             {
     790                 :                 /*
     791                 :                  * This shouldn't be possible, because we already determined
     792                 :                  * that the relation exists on disk and is big enough.
     793                 :                  * Something is wrong with the cache invalidation for
     794                 :                  * smgrexists(), smgrnblocks(), or the file was unlinked or
     795                 :                  * truncated beneath our feet?
     796                 :                  */
     797 UIC           0 :                 elog(ERROR,
     798 EUB             :                      "could not prefetch relation %u/%u/%u block %u",
     799                 :                      reln->smgr_rlocator.locator.spcOid,
     800                 :                      reln->smgr_rlocator.locator.dbOid,
     801                 :                      reln->smgr_rlocator.locator.relNumber,
     802                 :                      block->blkno);
     803                 :             }
     804                 :         }
     805                 : 
     806                 :         /*
     807                 :          * Several callsites need to be able to read exactly one record
     808                 :          * without any internal readahead.  Examples: xlog.c reading
     809                 :          * checkpoint records with emode set to PANIC, which might otherwise
     810                 :          * cause XLogPageRead() to panic on some future page, and xlog.c
     811                 :          * determining where to start writing WAL next, which depends on the
     812                 :          * contents of the reader's internal buffer after reading one record.
     813                 :          * Therefore, don't even think about prefetching until the first
     814                 :          * record after XLogPrefetcherBeginRead() has been consumed.
     815                 :          */
     816 GIC     2506463 :         if (prefetcher->reader->decode_queue_tail &&
     817 CBC     2506463 :             prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
     818            2448 :             return LRQ_NEXT_AGAIN;
     819 ECB             : 
     820                 :         /* Advance to the next record. */
     821 GIC     2504015 :         prefetcher->record = NULL;
     822 ECB             :     }
     823                 :     pg_unreachable();
     824                 : }
     825                 : 
     826                 : /*
     827                 :  * Expose statistics about recovery prefetching.
     828                 :  */
     829                 : Datum
     830 UIC           0 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
     831 EUB             : {
     832                 : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
     833 UIC           0 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     834 EUB             :     Datum       values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     835                 :     bool        nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     836                 : 
     837 UIC           0 :     InitMaterializedSRF(fcinfo, 0);
     838 EUB             : 
     839 UIC           0 :     for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
     840 UBC           0 :         nulls[i] = false;
     841 EUB             : 
     842 UIC           0 :     values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
     843 UBC           0 :     values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
     844               0 :     values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
     845               0 :     values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
     846               0 :     values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
     847               0 :     values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
     848               0 :     values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
     849               0 :     values[7] = Int32GetDatum(SharedStats->wal_distance);
     850               0 :     values[8] = Int32GetDatum(SharedStats->block_distance);
     851               0 :     values[9] = Int32GetDatum(SharedStats->io_depth);
     852               0 :     tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     853 EUB             : 
     854 UIC           0 :     return (Datum) 0;
     855 EUB             : }
     856                 : 
     857                 : /*
     858                 :  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
     859                 :  * has been replayed.
     860                 :  */
     861                 : static inline void
     862 GNC       24571 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     863 ECB             :                         BlockNumber blockno, XLogRecPtr lsn)
     864                 : {
     865                 :     XLogPrefetcherFilter *filter;
     866                 :     bool        found;
     867                 : 
     868 GNC       24571 :     filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
     869 CBC       24571 :     if (!found)
     870 ECB             :     {
     871                 :         /*
     872                 :          * Don't allow any prefetching of this block or higher until replayed.
     873                 :          */
     874 GIC       24556 :         filter->filter_until_replayed = lsn;
     875 CBC       24556 :         filter->filter_from_block = blockno;
     876           24556 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     877 ECB             :     }
     878                 :     else
     879                 :     {
     880                 :         /*
     881                 :          * We were already filtering this rlocator.  Extend the filter's
     882                 :          * lifetime to cover this WAL record, but leave the lower of the block
     883                 :          * numbers there because we don't want to have to track individual
     884                 :          * blocks.
     885                 :          */
     886 GIC          15 :         filter->filter_until_replayed = lsn;
     887              15 :         dlist_delete(&filter->link);
     888 CBC          15 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     889              15 :         filter->filter_from_block = Min(filter->filter_from_block, blockno);
     890 ECB             :     }
     891 CBC       24571 : }
     892                 : 
     893 ECB             : /*
     894                 :  * Have we replayed any records that caused us to begin filtering a block
     895                 :  * range?  That means that relations should have been created, extended or
     896                 :  * dropped as required, so we can stop filtering out accesses to a given
     897                 :  * relfilenumber.
     898                 :  */
     899                 : static inline void
     900 GIC     2506683 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
     901                 : {
     902 CBC     2531239 :     while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     903                 :     {
     904          608883 :         XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
     905                 :                                                           link,
     906 ECB             :                                                           &prefetcher->filter_queue);
     907                 : 
     908 GIC      608883 :         if (filter->filter_until_replayed >= replaying_lsn)
     909          584327 :             break;
     910 ECB             : 
     911 CBC       24556 :         dlist_delete(&filter->link);
     912 GIC       24556 :         hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
     913 ECB             :     }
     914 CBC     2506683 : }
     915                 : 
     916 ECB             : /*
     917                 :  * Check if a given block should be skipped due to a filter.
     918                 :  */
     919                 : static inline bool
     920 GNC     2564650 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     921                 :                          BlockNumber blockno)
     922 ECB             : {
     923                 :     /*
     924                 :      * Test for empty queue first, because we expect it to be empty most of
     925                 :      * the time and we can avoid the hash table lookup in that case.
     926                 :      */
     927 GIC     2564650 :     if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     928                 :     {
     929 ECB             :         XLogPrefetcherFilter *filter;
     930                 : 
     931                 :         /* See if the block range is filtered. */
     932 GNC      583644 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     933 GIC      583644 :         if (filter && filter->filter_from_block <= blockno)
     934 ECB             :         {
     935                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     936                 :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     937                 :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
     938                 :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     939                 :                  LSN_FORMAT_ARGS(filter->filter_until_replayed),
     940                 :                  filter->filter_from_block);
     941                 : #endif
     942 GIC      315982 :             return true;
     943                 :         }
     944 ECB             : 
     945                 :         /* See if the whole database is filtered. */
     946 GNC      267662 :         rlocator.relNumber = InvalidRelFileNumber;
     947          267662 :         rlocator.spcOid = InvalidOid;
     948          267662 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     949 CBC      267662 :         if (filter)
     950 ECB             :         {
     951                 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     952                 :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     953                 :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
     954                 :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     955                 :                  LSN_FORMAT_ARGS(filter->filter_until_replayed));
     956                 : #endif
     957 UIC           0 :             return true;
     958                 :         }
     959 EUB             :     }
     960                 : 
     961 GIC     2248668 :     return false;
     962                 : }
     963 ECB             : 
     964                 : /*
     965                 :  * A wrapper for XLogBeginRead() that also resets the prefetcher.
     966                 :  */
     967                 : void
     968 GIC        2448 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
     969                 : {
     970 ECB             :     /* This will forget about any in-flight IO. */
     971 GIC        2448 :     prefetcher->reconfigure_count--;
     972                 : 
     973 ECB             :     /* Book-keeping to avoid readahead on first read. */
     974 GIC        2448 :     prefetcher->begin_ptr = recPtr;
     975                 : 
     976 CBC        2448 :     prefetcher->no_readahead_until = 0;
     977                 : 
     978 ECB             :     /* This will forget about any queued up records in the decoder. */
     979 GIC        2448 :     XLogBeginRead(prefetcher->reader, recPtr);
     980            2448 : }
     981 ECB             : 
     982                 : /*
     983                 :  * A wrapper for XLogReadRecord() that provides the same interface, but also
     984                 :  * tries to initiate I/O for blocks referenced in future WAL records.
     985                 :  */
     986                 : XLogRecord *
     987 GIC     2506683 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
     988                 : {
     989 ECB             :     DecodedXLogRecord *record;
     990                 :     XLogRecPtr  replayed_up_to;
     991                 : 
     992                 :     /*
     993                 :      * See if it's time to reset the prefetching machinery, because a relevant
     994                 :      * GUC was changed.
     995                 :      */
     996 GIC     2506683 :     if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
     997                 :     {
     998 ECB             :         uint32      max_distance;
     999                 :         uint32      max_inflight;
    1000                 : 
    1001 GIC        2457 :         if (prefetcher->streaming_read)
    1002            1281 :             lrq_free(prefetcher->streaming_read);
    1003 ECB             : 
    1004 CBC        2457 :         if (RecoveryPrefetchEnabled())
    1005                 :         {
    1006            2457 :             Assert(maintenance_io_concurrency > 0);
    1007 GIC        2457 :             max_inflight = maintenance_io_concurrency;
    1008 CBC        2457 :             max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
    1009 ECB             :         }
    1010                 :         else
    1011                 :         {
    1012 UIC           0 :             max_inflight = 1;
    1013               0 :             max_distance = 1;
    1014 EUB             :         }
    1015                 : 
    1016 GIC        2457 :         prefetcher->streaming_read = lrq_alloc(max_distance,
    1017                 :                                                max_inflight,
    1018 ECB             :                                                (uintptr_t) prefetcher,
    1019                 :                                                XLogPrefetcherNextBlock);
    1020                 : 
    1021 GIC        2457 :         prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
    1022                 :     }
    1023 ECB             : 
    1024                 :     /*
    1025                 :      * Release last returned record, if there is one, as it's now been
    1026                 :      * replayed.
    1027                 :      */
    1028 GIC     2506683 :     replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
    1029                 : 
    1030 ECB             :     /*
    1031                 :      * Can we drop any filters yet?  If we were waiting for a relation to be
    1032                 :      * created or extended, it is now OK to access blocks in the covered
    1033                 :      * range.
    1034                 :      */
    1035 GIC     2506683 :     XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
    1036                 : 
    1037 ECB             :     /*
    1038                 :      * All IO initiated by earlier WAL is now completed.  This might trigger
    1039                 :      * further prefetching.
    1040                 :      */
    1041 GIC     2506683 :     lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
    1042                 : 
    1043 ECB             :     /*
    1044                 :      * If there's nothing queued yet, then start prefetching to cause at least
    1045                 :      * one record to be queued.
    1046                 :      */
    1047 GIC     2506652 :     if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
    1048                 :     {
    1049 CBC          14 :         Assert(lrq_inflight(prefetcher->streaming_read) == 0);
    1050 GIC          14 :         Assert(lrq_completed(prefetcher->streaming_read) == 0);
    1051 CBC          14 :         lrq_prefetch(prefetcher->streaming_read);
    1052 ECB             :     }
    1053                 : 
    1054                 :     /* Read the next record. */
    1055 GIC     2506652 :     record = XLogNextRecord(prefetcher->reader, errmsg);
    1056         2506652 :     if (!record)
    1057 CBC         195 :         return NULL;
    1058 ECB             : 
    1059                 :     /*
    1060                 :      * The record we just got is the "current" one, for the benefit of the
    1061                 :      * XLogRecXXX() macros.
    1062                 :      */
    1063 GIC     2506457 :     Assert(record == prefetcher->reader->record);
    1064                 : 
    1065 ECB             :     /*
    1066                 :      * If maintenance_io_concurrency is set very low, we might have started
    1067                 :      * prefetching some but not all of the blocks referenced in the record
    1068                 :      * we're about to return.  Forget about the rest of the blocks in this
    1069                 :      * record by dropping the prefetcher's reference to it.
    1070                 :      */
    1071 GIC     2506457 :     if (record == prefetcher->record)
    1072            2448 :         prefetcher->record = NULL;
    1073 ECB             : 
    1074                 :     /*
    1075                 :      * See if it's time to compute some statistics, because enough WAL has
    1076                 :      * been processed.
    1077                 :      */
    1078 GIC     2506457 :     if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
    1079           43269 :         XLogPrefetcherComputeStats(prefetcher);
    1080 ECB             : 
    1081 CBC     2506457 :     Assert(record == prefetcher->reader->record);
    1082                 : 
    1083         2506457 :     return &record->header;
    1084                 : }
    1085 ECB             : 
    1086                 : bool
    1087 GIC        1857 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
    1088                 : {
    1089 ECB             : #ifndef USE_PREFETCH
    1090                 :     if (*new_value == RECOVERY_PREFETCH_ON)
    1091                 :     {
    1092                 :         GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
    1093                 :         return false;
    1094                 :     }
    1095                 : #endif
    1096                 : 
    1097 GIC        1857 :     return true;
    1098                 : }
    1099 ECB             : 
    1100                 : void
    1101 GIC        1857 : assign_recovery_prefetch(int new_value, void *extra)
    1102                 : {
    1103 ECB             :     /* Reconfigure prefetching, because a setting it depends on changed. */
    1104 GIC        1857 :     recovery_prefetch = new_value;
    1105            1857 :     if (AmStartupProcess())
    1106 LBC           0 :         XLogPrefetchReconfigure();
    1107 CBC        1857 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a