LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - xloginsert.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 84.4 % 423 357 5 11 40 10 13 166 31 147 40 187 3 12
Current Date: 2023-04-08 15:15:32 Functions: 94.4 % 18 17 1 7 7 3 1 11 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * xloginsert.c
       4                 :  *      Functions for constructing WAL records
       5                 :  *
       6                 :  * Constructing a WAL record begins with a call to XLogBeginInsert,
       7                 :  * followed by a number of XLogRegister* calls. The registered data is
       8                 :  * collected in private working memory, and finally assembled into a chain
       9                 :  * of XLogRecData structs by a call to XLogRecordAssemble(). See
      10                 :  * access/transam/README for details.
      11                 :  *
      12                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      13                 :  * Portions Copyright (c) 1994, Regents of the University of California
      14                 :  *
      15                 :  * src/backend/access/transam/xloginsert.c
      16                 :  *
      17                 :  *-------------------------------------------------------------------------
      18                 :  */
      19                 : 
      20                 : #include "postgres.h"
      21                 : 
      22                 : #ifdef USE_LZ4
      23                 : #include <lz4.h>
      24                 : #endif
      25                 : 
      26                 : #ifdef USE_ZSTD
      27                 : #include <zstd.h>
      28                 : #endif
      29                 : 
      30                 : #include "access/xact.h"
      31                 : #include "access/xlog.h"
      32                 : #include "access/xlog_internal.h"
      33                 : #include "access/xloginsert.h"
      34                 : #include "catalog/pg_control.h"
      35                 : #include "common/pg_lzcompress.h"
      36                 : #include "executor/instrument.h"
      37                 : #include "miscadmin.h"
      38                 : #include "pg_trace.h"
      39                 : #include "replication/origin.h"
      40                 : #include "storage/bufmgr.h"
      41                 : #include "storage/proc.h"
      42                 : #include "utils/memutils.h"
      43                 : 
      44                 : /*
      45                 :  * Guess the maximum buffer size required to store a compressed version of
      46                 :  * backup block image.
      47                 :  */
      48                 : #ifdef USE_LZ4
      49                 : #define LZ4_MAX_BLCKSZ      LZ4_COMPRESSBOUND(BLCKSZ)
      50                 : #else
      51                 : #define LZ4_MAX_BLCKSZ      0
      52                 : #endif
      53                 : 
      54                 : #ifdef USE_ZSTD
      55                 : #define ZSTD_MAX_BLCKSZ     ZSTD_COMPRESSBOUND(BLCKSZ)
      56                 : #else
      57                 : #define ZSTD_MAX_BLCKSZ     0
      58                 : #endif
      59                 : 
      60                 : #define PGLZ_MAX_BLCKSZ     PGLZ_MAX_OUTPUT(BLCKSZ)
      61                 : 
      62                 : /* Buffer size required to store a compressed version of backup block image */
      63                 : #define COMPRESS_BUFSIZE    Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
      64                 : 
      65                 : /*
      66                 :  * For each block reference registered with XLogRegisterBuffer, we fill in
      67                 :  * a registered_buffer struct.
      68                 :  */
      69                 : typedef struct
      70                 : {
      71                 :     bool        in_use;         /* is this slot in use? */
      72                 :     uint8       flags;          /* REGBUF_* flags */
      73                 :     RelFileLocator rlocator;    /* identifies the relation and block */
      74                 :     ForkNumber  forkno;
      75                 :     BlockNumber block;
      76                 :     Page        page;           /* page content */
      77                 :     uint32      rdata_len;      /* total length of data in rdata chain */
      78                 :     XLogRecData *rdata_head;    /* head of the chain of data registered with
      79                 :                                  * this block */
      80                 :     XLogRecData *rdata_tail;    /* last entry in the chain, or &rdata_head if
      81                 :                                  * empty */
      82                 : 
      83                 :     XLogRecData bkp_rdatas[2];  /* temporary rdatas used to hold references to
      84                 :                                  * backup block data in XLogRecordAssemble() */
      85                 : 
      86                 :     /* buffer to store a compressed version of backup block image */
      87                 :     char        compressed_page[COMPRESS_BUFSIZE];
      88                 : } registered_buffer;
      89                 : 
      90                 : static registered_buffer *registered_buffers;
      91                 : static int  max_registered_buffers; /* allocated size */
      92                 : static int  max_registered_block_id = 0;    /* highest block_id + 1 currently
      93                 :                                              * registered */
      94                 : 
      95                 : /*
      96                 :  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
      97                 :  * with XLogRegisterData(...).
      98                 :  */
      99                 : static XLogRecData *mainrdata_head;
     100                 : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
     101                 : static uint64 mainrdata_len;    /* total # of bytes in chain */
     102                 : 
     103                 : /* flags for the in-progress insertion */
     104                 : static uint8 curinsert_flags = 0;
     105                 : 
     106                 : /*
     107                 :  * These are used to hold the record header while constructing a record.
     108                 :  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
     109                 :  * because we want it to be MAXALIGNed and padding bytes zeroed.
     110                 :  *
     111                 :  * For simplicity, it's allocated large enough to hold the headers for any
     112                 :  * WAL record.
     113                 :  */
     114                 : static XLogRecData hdr_rdt;
     115                 : static char *hdr_scratch = NULL;
     116                 : 
     117                 : #define SizeOfXlogOrigin    (sizeof(RepOriginId) + sizeof(char))
     118                 : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
     119                 : 
     120                 : #define HEADER_SCRATCH_SIZE \
     121                 :     (SizeOfXLogRecord + \
     122                 :      MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
     123                 :      SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
     124                 :      SizeOfXLogTransactionId)
     125                 : 
     126                 : /*
     127                 :  * An array of XLogRecData structs, to hold registered data.
     128                 :  */
     129                 : static XLogRecData *rdatas;
     130                 : static int  num_rdatas;         /* entries currently used */
     131                 : static int  max_rdatas;         /* allocated size */
     132                 : 
     133                 : static bool begininsert_called = false;
     134                 : 
     135                 : /* Memory context to hold the registered buffer and data references. */
     136                 : static MemoryContext xloginsert_cxt;
     137                 : 
     138                 : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
     139                 :                                        XLogRecPtr RedoRecPtr, bool doPageWrites,
     140                 :                                        XLogRecPtr *fpw_lsn, int *num_fpi,
     141                 :                                        bool *topxid_included);
     142                 : static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
     143                 :                                     uint16 hole_length, char *dest, uint16 *dlen);
     144                 : 
     145                 : /*
     146                 :  * Begin constructing a WAL record. This must be called before the
     147                 :  * XLogRegister* functions and XLogInsert().
     148                 :  */
     149                 : void
     150 CBC    23120097 : XLogBeginInsert(void)
     151                 : {
     152        23120097 :     Assert(max_registered_block_id == 0);
     153        23120097 :     Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
     154        23120097 :     Assert(mainrdata_len == 0);
     155                 : 
     156                 :     /* cross-check on whether we should be here or not */
     157        23120097 :     if (!XLogInsertAllowed())
     158 UBC           0 :         elog(ERROR, "cannot make new WAL entries during recovery");
     159                 : 
     160 CBC    23120097 :     if (begininsert_called)
     161 UBC           0 :         elog(ERROR, "XLogBeginInsert was already called");
     162                 : 
     163 CBC    23120097 :     begininsert_called = true;
     164        23120097 : }
     165                 : 
     166                 : /*
     167                 :  * Ensure that there are enough buffer and data slots in the working area,
     168                 :  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
     169                 :  * calls.
     170                 :  *
     171                 :  * There is always space for a small number of buffers and data chunks, enough
     172                 :  * for most record types. This function is for the exceptional cases that need
     173                 :  * more.
     174                 :  */
     175                 : void
     176           33860 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
     177                 : {
     178                 :     int         nbuffers;
     179                 : 
     180                 :     /*
     181                 :      * This must be called before entering a critical section, because
     182                 :      * allocating memory inside a critical section can fail. repalloc() will
     183                 :      * check the same, but better to check it here too so that we fail
     184                 :      * consistently even if the arrays happen to be large enough already.
     185                 :      */
     186           33860 :     Assert(CritSectionCount == 0);
     187                 : 
     188                 :     /* the minimum values can't be decreased */
     189           33860 :     if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
     190            2084 :         max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
     191           33860 :     if (ndatas < XLR_NORMAL_RDATAS)
     192           33846 :         ndatas = XLR_NORMAL_RDATAS;
     193                 : 
     194           33860 :     if (max_block_id > XLR_MAX_BLOCK_ID)
     195 UBC           0 :         elog(ERROR, "maximum number of WAL record block references exceeded");
     196 CBC       33860 :     nbuffers = max_block_id + 1;
     197                 : 
     198           33860 :     if (nbuffers > max_registered_buffers)
     199                 :     {
     200             500 :         registered_buffers = (registered_buffer *)
     201             500 :             repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
     202                 : 
     203                 :         /*
     204                 :          * At least the padding bytes in the structs must be zeroed, because
     205                 :          * they are included in WAL data, but initialize it all for tidiness.
     206                 :          */
     207             500 :         MemSet(&registered_buffers[max_registered_buffers], 0,
     208                 :                (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
     209             500 :         max_registered_buffers = nbuffers;
     210                 :     }
     211                 : 
     212           33860 :     if (ndatas > max_rdatas)
     213                 :     {
     214              12 :         rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
     215              12 :         max_rdatas = ndatas;
     216                 :     }
     217           33860 : }
     218                 : 
     219                 : /*
     220                 :  * Reset WAL record construction buffers.
     221                 :  */
     222                 : void
     223        23144663 : XLogResetInsertion(void)
     224                 : {
     225                 :     int         i;
     226                 : 
     227        46358906 :     for (i = 0; i < max_registered_block_id; i++)
     228        23214243 :         registered_buffers[i].in_use = false;
     229                 : 
     230        23144663 :     num_rdatas = 0;
     231        23144663 :     max_registered_block_id = 0;
     232        23144663 :     mainrdata_len = 0;
     233        23144663 :     mainrdata_last = (XLogRecData *) &mainrdata_head;
     234        23144663 :     curinsert_flags = 0;
     235        23144663 :     begininsert_called = false;
     236        23144663 : }
     237                 : 
     238                 : /*
     239                 :  * Register a reference to a buffer with the WAL record being constructed.
     240                 :  * This must be called for every page that the WAL-logged operation modifies.
     241                 :  */
     242                 : void
     243        22971600 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
     244                 : {
     245                 :     registered_buffer *regbuf;
     246                 : 
     247                 :     /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
     248        22971600 :     Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
     249        22971600 :     Assert(begininsert_called);
     250                 : 
     251        22971600 :     if (block_id >= max_registered_block_id)
     252                 :     {
     253        22616494 :         if (block_id >= max_registered_buffers)
     254 UBC           0 :             elog(ERROR, "too many registered buffers");
     255 CBC    22616494 :         max_registered_block_id = block_id + 1;
     256                 :     }
     257                 : 
     258        22971600 :     regbuf = &registered_buffers[block_id];
     259                 : 
     260 GNC    22971600 :     BufferGetTag(buffer, &regbuf->rlocator, &regbuf->forkno, &regbuf->block);
     261 CBC    22971600 :     regbuf->page = BufferGetPage(buffer);
     262        22971600 :     regbuf->flags = flags;
     263        22971600 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     264        22971600 :     regbuf->rdata_len = 0;
     265                 : 
     266                 :     /*
     267                 :      * Check that this page hasn't already been registered with some other
     268                 :      * block_id.
     269                 :      */
     270                 : #ifdef USE_ASSERT_CHECKING
     271                 :     {
     272                 :         int         i;
     273                 : 
     274        47799162 :         for (i = 0; i < max_registered_block_id; i++)
     275                 :         {
     276        24827562 :             registered_buffer *regbuf_old = &registered_buffers[i];
     277                 : 
     278        24827562 :             if (i == block_id || !regbuf_old->in_use)
     279        23344453 :                 continue;
     280                 : 
     281 GNC     1483109 :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     282                 :                    regbuf_old->forkno != regbuf->forkno ||
     283                 :                    regbuf_old->block != regbuf->block);
     284                 :         }
     285                 :     }
     286                 : #endif
     287                 : 
     288 CBC    22971600 :     regbuf->in_use = true;
     289        22971600 : }
     290                 : 
     291                 : /*
     292                 :  * Like XLogRegisterBuffer, but for registering a block that's not in the
     293                 :  * shared buffer pool (i.e. when you don't have a Buffer for it).
     294                 :  */
     295                 : void
     296 GNC      227106 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
     297                 :                   BlockNumber blknum, Page page, uint8 flags)
     298                 : {
     299                 :     registered_buffer *regbuf;
     300                 : 
     301 CBC      227106 :     Assert(begininsert_called);
     302                 : 
     303          227106 :     if (block_id >= max_registered_block_id)
     304          227106 :         max_registered_block_id = block_id + 1;
     305                 : 
     306          227106 :     if (block_id >= max_registered_buffers)
     307 UBC           0 :         elog(ERROR, "too many registered buffers");
     308                 : 
     309 CBC      227106 :     regbuf = &registered_buffers[block_id];
     310                 : 
     311 GNC      227106 :     regbuf->rlocator = *rlocator;
     312 CBC      227106 :     regbuf->forkno = forknum;
     313          227106 :     regbuf->block = blknum;
     314          227106 :     regbuf->page = page;
     315          227106 :     regbuf->flags = flags;
     316          227106 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     317          227106 :     regbuf->rdata_len = 0;
     318                 : 
     319                 :     /*
     320                 :      * Check that this page hasn't already been registered with some other
     321                 :      * block_id.
     322                 :      */
     323                 : #ifdef USE_ASSERT_CHECKING
     324                 :     {
     325                 :         int         i;
     326                 : 
     327          460767 :         for (i = 0; i < max_registered_block_id; i++)
     328                 :         {
     329          233661 :             registered_buffer *regbuf_old = &registered_buffers[i];
     330                 : 
     331          233661 :             if (i == block_id || !regbuf_old->in_use)
     332          227106 :                 continue;
     333                 : 
     334 GNC        6555 :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     335                 :                    regbuf_old->forkno != regbuf->forkno ||
     336                 :                    regbuf_old->block != regbuf->block);
     337                 :         }
     338                 :     }
     339                 : #endif
     340                 : 
     341 CBC      227106 :     regbuf->in_use = true;
     342          227106 : }
     343                 : 
     344                 : /*
     345                 :  * Add data to the WAL record that's being constructed.
     346                 :  *
     347                 :  * The data is appended to the "main chunk", available at replay with
     348                 :  * XLogRecGetData().
     349                 :  */
     350                 : void
     351 GNC    24364384 : XLogRegisterData(char *data, uint32 len)
     352                 : {
     353                 :     XLogRecData *rdata;
     354                 : 
     355 CBC    24364384 :     Assert(begininsert_called);
     356                 : 
     357        24364384 :     if (num_rdatas >= max_rdatas)
     358 UNC           0 :         ereport(ERROR,
     359                 :                 (errmsg_internal("too much WAL data"),
     360                 :                  errdetail_internal("%u out of %u data segments are already in use.",
     361                 :                                     num_rdatas, max_rdatas)));
     362 GIC    24364384 :     rdata = &rdatas[num_rdatas++];
     363                 : 
     364        24364384 :     rdata->data = data;
     365 CBC    24364384 :     rdata->len = len;
     366                 : 
     367 ECB             :     /*
     368                 :      * we use the mainrdata_last pointer to track the end of the chain, so no
     369                 :      * need to clear 'next' here.
     370                 :      */
     371                 : 
     372 GIC    24364384 :     mainrdata_last->next = rdata;
     373        24364384 :     mainrdata_last = rdata;
     374                 : 
     375 CBC    24364384 :     mainrdata_len += len;
     376        24364384 : }
     377                 : 
     378 ECB             : /*
     379                 :  * Add buffer-specific data to the WAL record that's being constructed.
     380                 :  *
     381                 :  * Block_id must reference a block previously registered with
     382                 :  * XLogRegisterBuffer(). If this is called more than once for the same
     383                 :  * block_id, the data is appended.
     384                 :  *
     385                 :  * The maximum amount of data that can be registered per block is 65535
     386                 :  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
     387                 :  * reconstruct the changes to the page, you might as well just log a full
     388                 :  * copy of it. (the "main data" that's not associated with a block is not
     389                 :  * limited)
     390                 :  */
     391                 : void
     392 GNC    32267949 : XLogRegisterBufData(uint8 block_id, char *data, uint32 len)
     393                 : {
     394                 :     registered_buffer *regbuf;
     395 ECB             :     XLogRecData *rdata;
     396                 : 
     397 GIC    32267949 :     Assert(begininsert_called);
     398                 : 
     399                 :     /* find the registered buffer struct */
     400 CBC    32267949 :     regbuf = &registered_buffers[block_id];
     401 GIC    32267949 :     if (!regbuf->in_use)
     402 UIC           0 :         elog(ERROR, "no block with id %d registered with WAL insertion",
     403 ECB             :              block_id);
     404                 : 
     405                 :     /*
     406                 :      * Check against max_rdatas and ensure we do not register more data per
     407                 :      * buffer than can be handled by the physical data format; i.e. that
     408                 :      * regbuf->rdata_len does not grow beyond what
     409                 :      * XLogRecordBlockHeader->data_length can hold.
     410                 :      */
     411 GBC    32267949 :     if (num_rdatas >= max_rdatas)
     412 UNC           0 :         ereport(ERROR,
     413                 :                 (errmsg_internal("too much WAL data"),
     414                 :                  errdetail_internal("%u out of %u data segments are already in use.",
     415                 :                                     num_rdatas, max_rdatas)));
     416 GNC    32267949 :     if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
     417 UNC           0 :         ereport(ERROR,
     418                 :                 (errmsg_internal("too much WAL data"),
     419                 :                  errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
     420                 :                                     UINT16_MAX, block_id, regbuf->rdata_len, len)));
     421                 : 
     422 GIC    32267949 :     rdata = &rdatas[num_rdatas++];
     423                 : 
     424        32267949 :     rdata->data = data;
     425        32267949 :     rdata->len = len;
     426                 : 
     427        32267949 :     regbuf->rdata_tail->next = rdata;
     428        32267949 :     regbuf->rdata_tail = rdata;
     429 CBC    32267949 :     regbuf->rdata_len += len;
     430 GBC    32267949 : }
     431                 : 
     432                 : /*
     433                 :  * Set insert status flags for the upcoming WAL record.
     434 ECB             :  *
     435 EUB             :  * The flags that can be used here are:
     436                 :  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
     437                 :  *   included in the record.
     438                 :  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
     439                 :  *   durability, which allows to avoid triggering WAL archiving and other
     440 ECB             :  *   background activity.
     441                 :  */
     442                 : void
     443 CBC    13933043 : XLogSetRecordFlags(uint8 flags)
     444                 : {
     445        13933043 :     Assert(begininsert_called);
     446        13933043 :     curinsert_flags |= flags;
     447        13933043 : }
     448 ECB             : 
     449                 : /*
     450                 :  * Insert an XLOG record having the specified RMID and info bytes, with the
     451                 :  * body of the record being the data and buffer references registered earlier
     452                 :  * with XLogRegister* calls.
     453                 :  *
     454                 :  * Returns XLOG pointer to end of record (beginning of next record).
     455                 :  * This can be used as LSN for data pages affected by the logged action.
     456                 :  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
     457                 :  * before the data page can be written out.  This implements the basic
     458                 :  * WAL rule "write the log before the data".)
     459                 :  */
     460                 : XLogRecPtr
     461 CBC    23120097 : XLogInsert(RmgrId rmid, uint8 info)
     462                 : {
     463 ECB             :     XLogRecPtr  EndPos;
     464                 : 
     465                 :     /* XLogBeginInsert() must have been called. */
     466 GIC    23120097 :     if (!begininsert_called)
     467 UIC           0 :         elog(ERROR, "XLogBeginInsert was not called");
     468                 : 
     469                 :     /*
     470                 :      * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
     471                 :      * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
     472                 :      */
     473 GIC    23120097 :     if ((info & ~(XLR_RMGR_INFO_MASK |
     474                 :                   XLR_SPECIAL_REL_UPDATE |
     475                 :                   XLR_CHECK_CONSISTENCY)) != 0)
     476 UIC           0 :         elog(PANIC, "invalid xlog info mask %02X", info);
     477                 : 
     478                 :     TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
     479 ECB             : 
     480                 :     /*
     481                 :      * In bootstrap mode, we don't actually log anything but XLOG resources;
     482                 :      * return a phony record pointer.
     483                 :      */
     484 CBC    23120097 :     if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
     485 EUB             :     {
     486 GIC     3720695 :         XLogResetInsertion();
     487         3720695 :         EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
     488         3720695 :         return EndPos;
     489                 :     }
     490                 : 
     491 ECB             :     do
     492                 :     {
     493                 :         XLogRecPtr  RedoRecPtr;
     494 EUB             :         bool        doPageWrites;
     495 GIC    19404394 :         bool        topxid_included = false;
     496                 :         XLogRecPtr  fpw_lsn;
     497                 :         XLogRecData *rdt;
     498        19404394 :         int         num_fpi = 0;
     499                 : 
     500                 :         /*
     501                 :          * Get values needed to decide whether to do full-page writes. Since
     502 ECB             :          * we don't yet have an insertion lock, these could change under us,
     503                 :          * but XLogInsertRecord will recheck them once it has a lock.
     504                 :          */
     505 CBC    19404394 :         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
     506 ECB             : 
     507 GIC    19404394 :         rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
     508                 :                                  &fpw_lsn, &num_fpi, &topxid_included);
     509                 : 
     510        19404394 :         EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
     511                 :                                   topxid_included);
     512        19404394 :     } while (EndPos == InvalidXLogRecPtr);
     513 ECB             : 
     514 GIC    19399402 :     XLogResetInsertion();
     515                 : 
     516 CBC    19399402 :     return EndPos;
     517                 : }
     518                 : 
     519                 : /*
     520                 :  * Assemble a WAL record from the registered data and buffers into an
     521                 :  * XLogRecData chain, ready for insertion with XLogInsertRecord().
     522                 :  *
     523 ECB             :  * The record header fields are filled in, except for the xl_prev field. The
     524                 :  * calculated CRC does not include the record header yet.
     525                 :  *
     526                 :  * If there are any registered buffers, and a full-page image was not taken
     527                 :  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
     528                 :  * signals that the assembled record is only good for insertion on the
     529                 :  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
     530                 :  *
     531                 :  * *topxid_included is set if the topmost transaction ID is logged with the
     532                 :  * current subtransaction.
     533                 :  */
     534                 : static XLogRecData *
     535 GIC    19404394 : XLogRecordAssemble(RmgrId rmid, uint8 info,
     536                 :                    XLogRecPtr RedoRecPtr, bool doPageWrites,
     537                 :                    XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
     538                 : {
     539                 :     XLogRecData *rdt;
     540 GNC    19404394 :     uint64      total_len = 0;
     541                 :     int         block_id;
     542                 :     pg_crc32c   rdata_crc;
     543 GIC    19404394 :     registered_buffer *prev_regbuf = NULL;
     544                 :     XLogRecData *rdt_datas_last;
     545                 :     XLogRecord *rechdr;
     546        19404394 :     char       *scratch = hdr_scratch;
     547                 : 
     548                 :     /*
     549                 :      * Note: this function can be called multiple times for the same record.
     550                 :      * All the modifications we do to the rdata chains below must handle that.
     551                 :      */
     552                 : 
     553 ECB             :     /* The record begins with the fixed-size header */
     554 GIC    19404394 :     rechdr = (XLogRecord *) scratch;
     555        19404394 :     scratch += SizeOfXLogRecord;
     556                 : 
     557        19404394 :     hdr_rdt.next = NULL;
     558 CBC    19404394 :     rdt_datas_last = &hdr_rdt;
     559 GIC    19404394 :     hdr_rdt.data = hdr_scratch;
     560                 : 
     561 ECB             :     /*
     562                 :      * Enforce consistency checks for this record if user is looking for it.
     563                 :      * Do this before at the beginning of this routine to give the possibility
     564                 :      * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
     565                 :      * a record.
     566                 :      */
     567 GIC    19404394 :     if (wal_consistency_checking[rmid])
     568            3225 :         info |= XLR_CHECK_CONSISTENCY;
     569                 : 
     570                 :     /*
     571                 :      * Make an rdata chain containing all the data portions of all block
     572 ECB             :      * references. This includes the data for full-page images. Also append
     573                 :      * the headers for the block references in the scratch buffer.
     574                 :      */
     575 CBC    19404394 :     *fpw_lsn = InvalidXLogRecPtr;
     576        39039459 :     for (block_id = 0; block_id < max_registered_block_id; block_id++)
     577 ECB             :     {
     578 GIC    19635065 :         registered_buffer *regbuf = &registered_buffers[block_id];
     579                 :         bool        needs_backup;
     580                 :         bool        needs_data;
     581                 :         XLogRecordBlockHeader bkpb;
     582                 :         XLogRecordBlockImageHeader bimg;
     583        19635065 :         XLogRecordBlockCompressHeader cbimg = {0};
     584                 :         bool        samerel;
     585 CBC    19635065 :         bool        is_compressed = false;
     586 ECB             :         bool        include_image;
     587                 : 
     588 GIC    19635065 :         if (!regbuf->in_use)
     589           15537 :             continue;
     590                 : 
     591                 :         /* Determine if this block needs to be backed up */
     592        19619528 :         if (regbuf->flags & REGBUF_FORCE_IMAGE)
     593 CBC      314590 :             needs_backup = true;
     594        19304938 :         else if (regbuf->flags & REGBUF_NO_IMAGE)
     595 GIC      423687 :             needs_backup = false;
     596 CBC    18881251 :         else if (!doPageWrites)
     597 GIC      208913 :             needs_backup = false;
     598                 :         else
     599                 :         {
     600                 :             /*
     601 ECB             :              * We assume page LSN is first data on *every* page that can be
     602                 :              * passed to XLogInsert, whether it has the standard page layout
     603                 :              * or not.
     604                 :              */
     605 GIC    18672338 :             XLogRecPtr  page_lsn = PageGetLSN(regbuf->page);
     606 ECB             : 
     607 CBC    18672338 :             needs_backup = (page_lsn <= RedoRecPtr);
     608 GIC    18672338 :             if (!needs_backup)
     609                 :             {
     610 CBC    18592651 :                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
     611        18150756 :                     *fpw_lsn = page_lsn;
     612 ECB             :             }
     613                 :         }
     614                 : 
     615                 :         /* Determine if the buffer data needs to included */
     616 GIC    19619528 :         if (regbuf->rdata_len == 0)
     617         2972249 :             needs_data = false;
     618        16647279 :         else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
     619          389871 :             needs_data = true;
     620                 :         else
     621        16257408 :             needs_data = !needs_backup;
     622                 : 
     623 CBC    19619528 :         bkpb.id = block_id;
     624 GIC    19619528 :         bkpb.fork_flags = regbuf->forkno;
     625 CBC    19619528 :         bkpb.data_length = 0;
     626 ECB             : 
     627 GIC    19619528 :         if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
     628 CBC      277915 :             bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
     629 ECB             : 
     630                 :         /*
     631                 :          * If needs_backup is true or WAL checking is enabled for current
     632                 :          * resource manager, log a full-page write for the current block.
     633                 :          */
     634 CBC    19619528 :         include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
     635 ECB             : 
     636 CBC    19619528 :         if (include_image)
     637 ECB             :         {
     638 GIC      399497 :             Page        page = regbuf->page;
     639 CBC      399497 :             uint16      compressed_len = 0;
     640                 : 
     641 ECB             :             /*
     642                 :              * The page needs to be backed up, so calculate its hole length
     643                 :              * and offset.
     644                 :              */
     645 CBC      399497 :             if (regbuf->flags & REGBUF_STANDARD)
     646 ECB             :             {
     647                 :                 /* Assume we can omit data between pd_lower and pd_upper */
     648 GIC      302981 :                 uint16      lower = ((PageHeader) page)->pd_lower;
     649          302981 :                 uint16      upper = ((PageHeader) page)->pd_upper;
     650                 : 
     651          302981 :                 if (lower >= SizeOfPageHeaderData &&
     652 CBC      301878 :                     upper > lower &&
     653                 :                     upper <= BLCKSZ)
     654 ECB             :                 {
     655 GIC      301878 :                     bimg.hole_offset = lower;
     656 CBC      301878 :                     cbimg.hole_length = upper - lower;
     657 ECB             :                 }
     658                 :                 else
     659                 :                 {
     660                 :                     /* No "hole" to remove */
     661 GIC        1103 :                     bimg.hole_offset = 0;
     662            1103 :                     cbimg.hole_length = 0;
     663 ECB             :                 }
     664                 :             }
     665                 :             else
     666                 :             {
     667                 :                 /* Not a standard page header, don't try to eliminate "hole" */
     668 GIC       96516 :                 bimg.hole_offset = 0;
     669 CBC       96516 :                 cbimg.hole_length = 0;
     670 ECB             :             }
     671                 : 
     672                 :             /*
     673                 :              * Try to compress a block image if wal_compression is enabled
     674                 :              */
     675 GIC      399497 :             if (wal_compression != WAL_COMPRESSION_NONE)
     676                 :             {
     677                 :                 is_compressed =
     678 UIC           0 :                     XLogCompressBackupBlock(page, bimg.hole_offset,
     679 LBC           0 :                                             cbimg.hole_length,
     680               0 :                                             regbuf->compressed_page,
     681                 :                                             &compressed_len);
     682                 :             }
     683                 : 
     684                 :             /*
     685                 :              * Fill in the remaining fields in the XLogRecordBlockHeader
     686 ECB             :              * struct
     687                 :              */
     688 GIC      399497 :             bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
     689                 : 
     690                 :             /* Report a full page image constructed for the WAL record */
     691          399497 :             *num_fpi += 1;
     692                 : 
     693 ECB             :             /*
     694                 :              * Construct XLogRecData entries for the page content.
     695                 :              */
     696 GBC      399497 :             rdt_datas_last->next = &regbuf->bkp_rdatas[0];
     697          399497 :             rdt_datas_last = rdt_datas_last->next;
     698 EUB             : 
     699 GIC      399497 :             bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
     700                 : 
     701                 :             /*
     702                 :              * If WAL consistency checking is enabled for the resource manager
     703                 :              * of this WAL record, a full-page image is included in the record
     704                 :              * for the block modified. During redo, the full-page is replayed
     705                 :              * only if BKPIMAGE_APPLY is set.
     706 ECB             :              */
     707 GIC      399497 :             if (needs_backup)
     708          394277 :                 bimg.bimg_info |= BKPIMAGE_APPLY;
     709 ECB             : 
     710 GIC      399497 :             if (is_compressed)
     711                 :             {
     712                 :                 /* The current compression is stored in the WAL record */
     713 UIC           0 :                 bimg.length = compressed_len;
     714 ECB             : 
     715                 :                 /* Set the compression method used for this block */
     716 UIC           0 :                 switch ((WalCompression) wal_compression)
     717 ECB             :                 {
     718 UIC           0 :                     case WAL_COMPRESSION_PGLZ:
     719               0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
     720               0 :                         break;
     721                 : 
     722               0 :                     case WAL_COMPRESSION_LZ4:
     723                 : #ifdef USE_LZ4
     724               0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
     725 ECB             : #else
     726                 :                         elog(ERROR, "LZ4 is not supported by this build");
     727                 : #endif
     728 LBC           0 :                         break;
     729                 : 
     730 UIC           0 :                     case WAL_COMPRESSION_ZSTD:
     731 EUB             : #ifdef USE_ZSTD
     732 UIC           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
     733                 : #else
     734 EUB             :                         elog(ERROR, "zstd is not supported by this build");
     735                 : #endif
     736 UBC           0 :                         break;
     737 EUB             : 
     738 UBC           0 :                     case WAL_COMPRESSION_NONE:
     739 UIC           0 :                         Assert(false);  /* cannot happen */
     740 EUB             :                         break;
     741                 :                         /* no default case, so that compiler will warn */
     742                 :                 }
     743                 : 
     744 UIC           0 :                 rdt_datas_last->data = regbuf->compressed_page;
     745               0 :                 rdt_datas_last->len = compressed_len;
     746 EUB             :             }
     747                 :             else
     748                 :             {
     749 GIC      399497 :                 bimg.length = BLCKSZ - cbimg.hole_length;
     750 EUB             : 
     751 GIC      399497 :                 if (cbimg.hole_length == 0)
     752                 :                 {
     753           97619 :                     rdt_datas_last->data = page;
     754 GBC       97619 :                     rdt_datas_last->len = BLCKSZ;
     755                 :                 }
     756 EUB             :                 else
     757                 :                 {
     758                 :                     /* must skip the hole */
     759 GIC      301878 :                     rdt_datas_last->data = page;
     760          301878 :                     rdt_datas_last->len = bimg.hole_offset;
     761                 : 
     762 GBC      301878 :                     rdt_datas_last->next = &regbuf->bkp_rdatas[1];
     763          301878 :                     rdt_datas_last = rdt_datas_last->next;
     764                 : 
     765 GIC      301878 :                     rdt_datas_last->data =
     766          301878 :                         page + (bimg.hole_offset + cbimg.hole_length);
     767 CBC      301878 :                     rdt_datas_last->len =
     768 GIC      301878 :                         BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
     769 ECB             :                 }
     770                 :             }
     771                 : 
     772 CBC      399497 :             total_len += bimg.length;
     773                 :         }
     774                 : 
     775 GIC    19619528 :         if (needs_data)
     776                 :         {
     777                 :             /*
     778                 :              * When copying to XLogRecordBlockHeader, the length is narrowed
     779                 :              * to an uint16.  Double-check that it is still correct.
     780                 :              */
     781 GNC    16602152 :             Assert(regbuf->rdata_len <= UINT16_MAX);
     782                 : 
     783 ECB             :             /*
     784                 :              * Link the caller-supplied rdata chain for this buffer to the
     785                 :              * overall list.
     786                 :              */
     787 CBC    16602152 :             bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
     788 GNC    16602152 :             bkpb.data_length = (uint16) regbuf->rdata_len;
     789 CBC    16602152 :             total_len += regbuf->rdata_len;
     790 ECB             : 
     791 CBC    16602152 :             rdt_datas_last->next = regbuf->rdata_head;
     792        16602152 :             rdt_datas_last = regbuf->rdata_tail;
     793                 :         }
     794                 : 
     795 GNC    19619528 :         if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
     796 ECB             :         {
     797 GIC      832420 :             samerel = true;
     798          832420 :             bkpb.fork_flags |= BKPBLOCK_SAME_REL;
     799 ECB             :         }
     800                 :         else
     801 GIC    18787108 :             samerel = false;
     802        19619528 :         prev_regbuf = regbuf;
     803                 : 
     804                 :         /* Ok, copy the header to the scratch buffer */
     805 CBC    19619528 :         memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
     806 GIC    19619528 :         scratch += SizeOfXLogRecordBlockHeader;
     807        19619528 :         if (include_image)
     808                 :         {
     809          399497 :             memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
     810          399497 :             scratch += SizeOfXLogRecordBlockImageHeader;
     811 CBC      399497 :             if (cbimg.hole_length != 0 && is_compressed)
     812 ECB             :             {
     813 LBC           0 :                 memcpy(scratch, &cbimg,
     814                 :                        SizeOfXLogRecordBlockCompressHeader);
     815               0 :                 scratch += SizeOfXLogRecordBlockCompressHeader;
     816 ECB             :             }
     817                 :         }
     818 GIC    19619528 :         if (!samerel)
     819 ECB             :         {
     820 GNC    18787108 :             memcpy(scratch, &regbuf->rlocator, sizeof(RelFileLocator));
     821        18787108 :             scratch += sizeof(RelFileLocator);
     822 ECB             :         }
     823 GIC    19619528 :         memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
     824        19619528 :         scratch += sizeof(BlockNumber);
     825 ECB             :     }
     826                 : 
     827                 :     /* followed by the record's origin, if any */
     828 GIC    19404394 :     if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
     829 CBC    10244095 :         replorigin_session_origin != InvalidRepOriginId)
     830 ECB             :     {
     831 CBC      149492 :         *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
     832 GIC      149492 :         memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
     833 CBC      149492 :         scratch += sizeof(replorigin_session_origin);
     834 ECB             :     }
     835                 : 
     836                 :     /* followed by toplevel XID, if not already included in previous record */
     837 GBC    19404394 :     if (IsSubxactTopXidLogPending())
     838                 :     {
     839             236 :         TransactionId xid = GetTopTransactionIdIfAny();
     840                 : 
     841                 :         /* Set the flag that the top xid is included in the WAL */
     842 CBC         236 :         *topxid_included = true;
     843                 : 
     844             236 :         *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
     845             236 :         memcpy(scratch, &xid, sizeof(TransactionId));
     846 GIC         236 :         scratch += sizeof(TransactionId);
     847 ECB             :     }
     848                 : 
     849                 :     /* followed by main data, if any */
     850 GIC    19404394 :     if (mainrdata_len > 0)
     851                 :     {
     852 CBC    19046256 :         if (mainrdata_len > 255)
     853 ECB             :         {
     854                 :             uint32      mainrdata_len_4b;
     855                 : 
     856 GNC       69184 :             if (mainrdata_len > PG_UINT32_MAX)
     857 UNC           0 :                 ereport(ERROR,
     858                 :                         (errmsg_internal("too much WAL data"),
     859                 :                          errdetail_internal("Main data length is %llu bytes for a maximum of %u bytes.",
     860                 :                                             (unsigned long long) mainrdata_len,
     861                 :                                             PG_UINT32_MAX)));
     862                 : 
     863 GNC       69184 :             mainrdata_len_4b = (uint32) mainrdata_len;
     864 GIC       69184 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
     865 GNC       69184 :             memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
     866 CBC       69184 :             scratch += sizeof(uint32);
     867 ECB             :         }
     868                 :         else
     869                 :         {
     870 GIC    18977072 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
     871 CBC    18977072 :             *(scratch++) = (uint8) mainrdata_len;
     872                 :         }
     873        19046256 :         rdt_datas_last->next = mainrdata_head;
     874 GIC    19046256 :         rdt_datas_last = mainrdata_last;
     875        19046256 :         total_len += mainrdata_len;
     876 ECB             :     }
     877 GIC    19404394 :     rdt_datas_last->next = NULL;
     878 ECB             : 
     879 CBC    19404394 :     hdr_rdt.len = (scratch - hdr_scratch);
     880        19404394 :     total_len += hdr_rdt.len;
     881                 : 
     882                 :     /*
     883                 :      * Calculate CRC of the data
     884 ECB             :      *
     885                 :      * Note that the record header isn't added into the CRC initially since we
     886                 :      * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
     887                 :      * the whole record in the order: rdata, then backup blocks, then record
     888                 :      * header.
     889                 :      */
     890 CBC    19404394 :     INIT_CRC32C(rdata_crc);
     891 GBC    19404394 :     COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
     892 GIC    65952150 :     for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
     893        46547756 :         COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
     894                 : 
     895                 :     /*
     896                 :      * Ensure that the XLogRecord is not too large.
     897                 :      *
     898                 :      * XLogReader machinery is only able to handle records up to a certain
     899                 :      * size (ignoring machine resource limitations), so make sure that we will
     900                 :      * not emit records larger than the sizes advertised to be supported.
     901                 :      * This cap is based on DecodeXLogRecordRequiredSpace().
     902                 :      */
     903 GNC    19404394 :     if (total_len >= XLogRecordMaxSize)
     904 UNC           0 :         ereport(ERROR,
     905                 :                 (errmsg_internal("oversized WAL record"),
     906                 :                  errdetail_internal("WAL record would be %llu bytes (of maximum %u bytes); rmid %u flags %u.",
     907                 :                                     (unsigned long long) total_len, XLogRecordMaxSize, rmid, info)));
     908                 : 
     909                 :     /*
     910                 :      * Fill in the fields in the record header. Prev-link is filled in later,
     911 ECB             :      * once we know where in the WAL the record will be inserted. The CRC does
     912                 :      * not include the record header yet.
     913                 :      */
     914 CBC    19404394 :     rechdr->xl_xid = GetCurrentTransactionIdIfAny();
     915 GNC    19404394 :     rechdr->xl_tot_len = (uint32) total_len;
     916 GIC    19404394 :     rechdr->xl_info = info;
     917        19404394 :     rechdr->xl_rmid = rmid;
     918 CBC    19404394 :     rechdr->xl_prev = InvalidXLogRecPtr;
     919        19404394 :     rechdr->xl_crc = rdata_crc;
     920                 : 
     921        19404394 :     return &hdr_rdt;
     922 ECB             : }
     923                 : 
     924                 : /*
     925                 :  * Create a compressed version of a backup block image.
     926                 :  *
     927                 :  * Returns false if compression fails (i.e., compressed result is actually
     928                 :  * bigger than original). Otherwise, returns true and sets 'dlen' to
     929                 :  * the length of compressed block image.
     930                 :  */
     931                 : static bool
     932 UIC           0 : XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
     933                 :                         char *dest, uint16 *dlen)
     934                 : {
     935               0 :     int32       orig_len = BLCKSZ - hole_length;
     936               0 :     int32       len = -1;
     937               0 :     int32       extra_bytes = 0;
     938 ECB             :     char       *source;
     939                 :     PGAlignedBlock tmp;
     940                 : 
     941 LBC           0 :     if (hole_length != 0)
     942                 :     {
     943                 :         /* must skip the hole */
     944 UIC           0 :         source = tmp.data;
     945               0 :         memcpy(source, page, hole_offset);
     946               0 :         memcpy(source + hole_offset,
     947               0 :                page + (hole_offset + hole_length),
     948               0 :                BLCKSZ - (hole_length + hole_offset));
     949                 : 
     950                 :         /*
     951 ECB             :          * Extra data needs to be stored in WAL record for the compressed
     952 EUB             :          * version of block image if the hole exists.
     953                 :          */
     954 UIC           0 :         extra_bytes = SizeOfXLogRecordBlockCompressHeader;
     955                 :     }
     956                 :     else
     957               0 :         source = page;
     958                 : 
     959               0 :     switch ((WalCompression) wal_compression)
     960                 :     {
     961               0 :         case WAL_COMPRESSION_PGLZ:
     962 LBC           0 :             len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
     963               0 :             break;
     964 ECB             : 
     965 LBC           0 :         case WAL_COMPRESSION_LZ4:
     966 ECB             : #ifdef USE_LZ4
     967 LBC           0 :             len = LZ4_compress_default(source, dest, orig_len,
     968                 :                                        COMPRESS_BUFSIZE);
     969               0 :             if (len <= 0)
     970 UIC           0 :                 len = -1;       /* failure */
     971                 : #else
     972                 :             elog(ERROR, "LZ4 is not supported by this build");
     973                 : #endif
     974               0 :             break;
     975                 : 
     976               0 :         case WAL_COMPRESSION_ZSTD:
     977                 : #ifdef USE_ZSTD
     978               0 :             len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
     979                 :                                 ZSTD_CLEVEL_DEFAULT);
     980 UBC           0 :             if (ZSTD_isError(len))
     981 UIC           0 :                 len = -1;       /* failure */
     982                 : #else
     983 EUB             :             elog(ERROR, "zstd is not supported by this build");
     984                 : #endif
     985 UBC           0 :             break;
     986                 : 
     987 UIC           0 :         case WAL_COMPRESSION_NONE:
     988               0 :             Assert(false);      /* cannot happen */
     989 EUB             :             break;
     990                 :             /* no default case, so that compiler will warn */
     991                 :     }
     992                 : 
     993                 :     /*
     994                 :      * We recheck the actual size even if compression reports success and see
     995                 :      * if the number of bytes saved by compression is larger than the length
     996                 :      * of extra data needed for the compressed version of block image.
     997                 :      */
     998 UIC           0 :     if (len >= 0 &&
     999               0 :         len + extra_bytes < orig_len)
    1000                 :     {
    1001               0 :         *dlen = (uint16) len;   /* successful compression */
    1002 UBC           0 :         return true;
    1003                 :     }
    1004 UIC           0 :     return false;
    1005 EUB             : }
    1006                 : 
    1007                 : /*
    1008                 :  * Determine whether the buffer referenced has to be backed up.
    1009                 :  *
    1010                 :  * Since we don't yet have the insert lock, fullPageWrites and runningBackups
    1011                 :  * (which forces full-page writes) could change later, so the result should
    1012                 :  * be used for optimization purposes only.
    1013                 :  */
    1014                 : bool
    1015 GBC      188010 : XLogCheckBufferNeedsBackup(Buffer buffer)
    1016                 : {
    1017 EUB             :     XLogRecPtr  RedoRecPtr;
    1018                 :     bool        doPageWrites;
    1019                 :     Page        page;
    1020                 : 
    1021 GIC      188010 :     GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
    1022 EUB             : 
    1023 GIC      188010 :     page = BufferGetPage(buffer);
    1024 EUB             : 
    1025 GIC      188010 :     if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
    1026 GBC         808 :         return true;            /* buffer requires backup */
    1027                 : 
    1028          187202 :     return false;               /* buffer does not need to be backed up */
    1029 EUB             : }
    1030                 : 
    1031                 : /*
    1032                 :  * Write a backup block if needed when we are setting a hint. Note that
    1033                 :  * this may be called for a variety of page types, not just heaps.
    1034                 :  *
    1035                 :  * Callable while holding just share lock on the buffer content.
    1036                 :  *
    1037                 :  * We can't use the plain backup block mechanism since that relies on the
    1038                 :  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
    1039                 :  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
    1040                 :  * failures. So instead we copy the page and insert the copied data as normal
    1041                 :  * record data.
    1042                 :  *
    1043                 :  * We only need to do something if page has not yet been full page written in
    1044                 :  * this checkpoint round. The LSN of the inserted wal record is returned if we
    1045                 :  * had to write, InvalidXLogRecPtr otherwise.
    1046                 :  *
    1047                 :  * It is possible that multiple concurrent backends could attempt to write WAL
    1048                 :  * records. In that case, multiple copies of the same block would be recorded
    1049                 :  * in separate WAL records by different backends, though that is still OK from
    1050                 :  * a correctness perspective.
    1051                 :  */
    1052                 : XLogRecPtr
    1053 GIC       49519 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
    1054                 : {
    1055           49519 :     XLogRecPtr  recptr = InvalidXLogRecPtr;
    1056                 :     XLogRecPtr  lsn;
    1057                 :     XLogRecPtr  RedoRecPtr;
    1058                 : 
    1059                 :     /*
    1060                 :      * Ensure no checkpoint can change our view of RedoRecPtr.
    1061                 :      */
    1062           49519 :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
    1063 ECB             : 
    1064                 :     /*
    1065                 :      * Update RedoRecPtr so that we can make the right decision
    1066                 :      */
    1067 GIC       49519 :     RedoRecPtr = GetRedoRecPtr();
    1068                 : 
    1069 ECB             :     /*
    1070                 :      * We assume page LSN is first data on *every* page that can be passed to
    1071                 :      * XLogInsert, whether it has the standard page layout or not. Since we're
    1072                 :      * only holding a share-lock on the page, we must take the buffer header
    1073                 :      * lock when we look at the LSN.
    1074                 :      */
    1075 GIC       49519 :     lsn = BufferGetLSNAtomic(buffer);
    1076 ECB             : 
    1077 GIC       49519 :     if (lsn <= RedoRecPtr)
    1078                 :     {
    1079            5122 :         int         flags = 0;
    1080                 :         PGAlignedBlock copied_buffer;
    1081            5122 :         char       *origdata = (char *) BufferGetBlock(buffer);
    1082                 :         RelFileLocator rlocator;
    1083                 :         ForkNumber  forkno;
    1084                 :         BlockNumber blkno;
    1085                 : 
    1086                 :         /*
    1087                 :          * Copy buffer so we don't have to worry about concurrent hint bit or
    1088                 :          * lsn updates. We assume pd_lower/upper cannot be changed without an
    1089                 :          * exclusive lock, so the contents bkp are not racy.
    1090                 :          */
    1091            5122 :         if (buffer_std)
    1092                 :         {
    1093                 :             /* Assume we can omit data between pd_lower and pd_upper */
    1094            3722 :             Page        page = BufferGetPage(buffer);
    1095            3722 :             uint16      lower = ((PageHeader) page)->pd_lower;
    1096            3722 :             uint16      upper = ((PageHeader) page)->pd_upper;
    1097                 : 
    1098            3722 :             memcpy(copied_buffer.data, origdata, lower);
    1099            3722 :             memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
    1100                 :         }
    1101 ECB             :         else
    1102 GIC        1400 :             memcpy(copied_buffer.data, origdata, BLCKSZ);
    1103 ECB             : 
    1104 GIC        5122 :         XLogBeginInsert();
    1105                 : 
    1106            5122 :         if (buffer_std)
    1107            3722 :             flags |= REGBUF_STANDARD;
    1108                 : 
    1109 GNC        5122 :         BufferGetTag(buffer, &rlocator, &forkno, &blkno);
    1110            5122 :         XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
    1111                 : 
    1112 GIC        5122 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
    1113                 :     }
    1114                 : 
    1115 CBC       49519 :     return recptr;
    1116                 : }
    1117                 : 
    1118                 : /*
    1119                 :  * Write a WAL record containing a full image of a page. Caller is responsible
    1120                 :  * for writing the page to disk after calling this routine.
    1121                 :  *
    1122                 :  * Note: If you're using this function, you should be building pages in private
    1123 ECB             :  * memory and writing them directly to smgr.  If you're using buffers, call
    1124                 :  * log_newpage_buffer instead.
    1125                 :  *
    1126                 :  * If the page follows the standard page layout, with a PageHeader and unused
    1127                 :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1128                 :  * the unused space to be left out from the WAL record, making it smaller.
    1129                 :  */
    1130                 : XLogRecPtr
    1131 GNC      221528 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
    1132                 :             Page page, bool page_std)
    1133                 : {
    1134                 :     int         flags;
    1135                 :     XLogRecPtr  recptr;
    1136                 : 
    1137 GIC      221528 :     flags = REGBUF_FORCE_IMAGE;
    1138          221528 :     if (page_std)
    1139 CBC      221445 :         flags |= REGBUF_STANDARD;
    1140                 : 
    1141 GIC      221528 :     XLogBeginInsert();
    1142 GNC      221528 :     XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
    1143 CBC      221528 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1144 ECB             : 
    1145                 :     /*
    1146                 :      * The page may be uninitialized. If so, we can't set the LSN because that
    1147                 :      * would corrupt the page.
    1148                 :      */
    1149 GIC      221528 :     if (!PageIsNew(page))
    1150 ECB             :     {
    1151 GIC      221522 :         PageSetLSN(page, recptr);
    1152 ECB             :     }
    1153                 : 
    1154 CBC      221528 :     return recptr;
    1155 ECB             : }
    1156                 : 
    1157                 : /*
    1158                 :  * Like log_newpage(), but allows logging multiple pages in one operation.
    1159                 :  * It is more efficient than calling log_newpage() for each page separately,
    1160                 :  * because we can write multiple pages in a single WAL record.
    1161                 :  */
    1162                 : void
    1163 GNC          19 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
    1164                 :              BlockNumber *blknos, Page *pages, bool page_std)
    1165                 : {
    1166                 :     int         flags;
    1167                 :     XLogRecPtr  recptr;
    1168                 :     int         i;
    1169                 :     int         j;
    1170                 : 
    1171 GIC          19 :     flags = REGBUF_FORCE_IMAGE;
    1172              19 :     if (page_std)
    1173              19 :         flags |= REGBUF_STANDARD;
    1174                 : 
    1175                 :     /*
    1176                 :      * Iterate over all the pages. They are collected into batches of
    1177                 :      * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
    1178                 :      * batch.
    1179 ECB             :      */
    1180 GIC          19 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1181                 : 
    1182              19 :     i = 0;
    1183              38 :     while (i < num_pages)
    1184                 :     {
    1185 CBC          19 :         int         batch_start = i;
    1186 ECB             :         int         nbatch;
    1187                 : 
    1188 GIC          19 :         XLogBeginInsert();
    1189 ECB             : 
    1190 CBC          19 :         nbatch = 0;
    1191             475 :         while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
    1192                 :         {
    1193 GNC         456 :             XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
    1194 GIC         456 :             i++;
    1195             456 :             nbatch++;
    1196                 :         }
    1197 ECB             : 
    1198 GIC          19 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1199 ECB             : 
    1200 GIC         475 :         for (j = batch_start; j < i; j++)
    1201                 :         {
    1202 ECB             :             /*
    1203                 :              * The page may be uninitialized. If so, we can't set the LSN
    1204                 :              * because that would corrupt the page.
    1205                 :              */
    1206 GIC         456 :             if (!PageIsNew(pages[j]))
    1207                 :             {
    1208             456 :                 PageSetLSN(pages[j], recptr);
    1209                 :             }
    1210                 :         }
    1211 ECB             :     }
    1212 GIC          19 : }
    1213                 : 
    1214                 : /*
    1215                 :  * Write a WAL record containing a full image of a page.
    1216                 :  *
    1217                 :  * Caller should initialize the buffer and mark it dirty before calling this
    1218                 :  * function.  This function will set the page LSN.
    1219 ECB             :  *
    1220                 :  * If the page follows the standard page layout, with a PageHeader and unused
    1221                 :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1222                 :  * the unused space to be left out from the WAL record, making it smaller.
    1223                 :  */
    1224                 : XLogRecPtr
    1225 GIC      104032 : log_newpage_buffer(Buffer buffer, bool page_std)
    1226                 : {
    1227          104032 :     Page        page = BufferGetPage(buffer);
    1228                 :     RelFileLocator rlocator;
    1229                 :     ForkNumber  forknum;
    1230 ECB             :     BlockNumber blkno;
    1231                 : 
    1232                 :     /* Shared buffers should be modified in a critical section. */
    1233 CBC      104032 :     Assert(CritSectionCount > 0);
    1234                 : 
    1235 GNC      104032 :     BufferGetTag(buffer, &rlocator, &forknum, &blkno);
    1236 ECB             : 
    1237 GNC      104032 :     return log_newpage(&rlocator, forknum, blkno, page, page_std);
    1238 ECB             : }
    1239                 : 
    1240                 : /*
    1241                 :  * WAL-log a range of blocks in a relation.
    1242                 :  *
    1243                 :  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
    1244                 :  * written to the WAL. If the range is large, this is done in multiple WAL
    1245                 :  * records.
    1246                 :  *
    1247                 :  * If all page follows the standard page layout, with a PageHeader and unused
    1248                 :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1249                 :  * the unused space to be left out from the WAL records, making them smaller.
    1250                 :  *
    1251                 :  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
    1252                 :  * is used on a newly-built relation, and the caller is holding a
    1253                 :  * AccessExclusiveLock on it, so no other backend can be accessing it at the
    1254                 :  * same time. If that's not the case, you must ensure that this does not
    1255                 :  * cause a deadlock through some other means.
    1256                 :  */
    1257                 : void
    1258 GNC       31653 : log_newpage_range(Relation rel, ForkNumber forknum,
    1259                 :                   BlockNumber startblk, BlockNumber endblk,
    1260 ECB             :                   bool page_std)
    1261                 : {
    1262                 :     int         flags;
    1263                 :     BlockNumber blkno;
    1264                 : 
    1265 GIC       31653 :     flags = REGBUF_FORCE_IMAGE;
    1266           31653 :     if (page_std)
    1267             257 :         flags |= REGBUF_STANDARD;
    1268                 : 
    1269                 :     /*
    1270                 :      * Iterate over all the pages in the range. They are collected into
    1271                 :      * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
    1272                 :      * for each batch.
    1273 ECB             :      */
    1274 GIC       31653 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1275 ECB             : 
    1276 GIC       31653 :     blkno = startblk;
    1277           55321 :     while (blkno < endblk)
    1278                 :     {
    1279                 :         Buffer      bufpack[XLR_MAX_BLOCK_ID];
    1280                 :         XLogRecPtr  recptr;
    1281 ECB             :         int         nbufs;
    1282                 :         int         i;
    1283                 : 
    1284 GIC       23668 :         CHECK_FOR_INTERRUPTS();
    1285 ECB             : 
    1286                 :         /* Collect a batch of blocks. */
    1287 GIC       23668 :         nbufs = 0;
    1288          114707 :         while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
    1289                 :         {
    1290 GNC       91039 :             Buffer      buf = ReadBufferExtended(rel, forknum, blkno,
    1291                 :                                                  RBM_NORMAL, NULL);
    1292                 : 
    1293 GIC       91039 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
    1294                 : 
    1295                 :             /*
    1296                 :              * Completely empty pages are not WAL-logged. Writing a WAL record
    1297                 :              * would change the LSN, and we don't want that. We want the page
    1298                 :              * to stay empty.
    1299                 :              */
    1300           91039 :             if (!PageIsNew(BufferGetPage(buf)))
    1301           90938 :                 bufpack[nbufs++] = buf;
    1302                 :             else
    1303             101 :                 UnlockReleaseBuffer(buf);
    1304           91039 :             blkno++;
    1305                 :         }
    1306 ECB             : 
    1307                 :         /* Write WAL record for this batch. */
    1308 GIC       23668 :         XLogBeginInsert();
    1309                 : 
    1310           23668 :         START_CRIT_SECTION();
    1311          114606 :         for (i = 0; i < nbufs; i++)
    1312                 :         {
    1313 CBC       90938 :             XLogRegisterBuffer(i, bufpack[i], flags);
    1314           90938 :             MarkBufferDirty(bufpack[i]);
    1315 ECB             :         }
    1316                 : 
    1317 GIC       23668 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1318                 : 
    1319          114606 :         for (i = 0; i < nbufs; i++)
    1320                 :         {
    1321           90938 :             PageSetLSN(BufferGetPage(bufpack[i]), recptr);
    1322 CBC       90938 :             UnlockReleaseBuffer(bufpack[i]);
    1323                 :         }
    1324           23668 :         END_CRIT_SECTION();
    1325 ECB             :     }
    1326 GIC       31653 : }
    1327                 : 
    1328                 : /*
    1329                 :  * Allocate working buffers needed for WAL record construction.
    1330                 :  */
    1331                 : void
    1332 CBC       13218 : InitXLogInsert(void)
    1333                 : {
    1334                 : #ifdef USE_ASSERT_CHECKING
    1335                 : 
    1336                 :     /*
    1337                 :      * Check that any records assembled can be decoded.  This is capped based
    1338                 :      * on what XLogReader would require at its maximum bound.  This code path
    1339                 :      * is called once per backend, more than enough for this check.
    1340                 :      */
    1341 GNC       13218 :     size_t      max_required = DecodeXLogRecordRequiredSpace(XLogRecordMaxSize);
    1342                 : 
    1343           13218 :     Assert(AllocSizeIsValid(max_required));
    1344                 : #endif
    1345                 : 
    1346                 :     /* Initialize the working areas */
    1347 CBC       13218 :     if (xloginsert_cxt == NULL)
    1348 ECB             :     {
    1349 GIC       13218 :         xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
    1350 ECB             :                                                "WAL record construction",
    1351                 :                                                ALLOCSET_DEFAULT_SIZES);
    1352                 :     }
    1353                 : 
    1354 GIC       13218 :     if (registered_buffers == NULL)
    1355                 :     {
    1356           13218 :         registered_buffers = (registered_buffer *)
    1357           13218 :             MemoryContextAllocZero(xloginsert_cxt,
    1358                 :                                    sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
    1359           13218 :         max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
    1360 ECB             :     }
    1361 CBC       13218 :     if (rdatas == NULL)
    1362                 :     {
    1363           13218 :         rdatas = MemoryContextAlloc(xloginsert_cxt,
    1364 ECB             :                                     sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
    1365 GIC       13218 :         max_rdatas = XLR_NORMAL_RDATAS;
    1366                 :     }
    1367                 : 
    1368 ECB             :     /*
    1369                 :      * Allocate a buffer to hold the header information for a WAL record.
    1370                 :      */
    1371 CBC       13218 :     if (hdr_scratch == NULL)
    1372 GIC       13218 :         hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
    1373 ECB             :                                              HEADER_SCRATCH_SIZE);
    1374 CBC       13218 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a