LCOV - differential code coverage report
Current view: top level - src/backend/access/nbtree - nbtxlog.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.5 % 505 467 2 7 10 19 8 76 3 380 10 78 1 2
Current Date: 2023-04-08 15:15:32 Functions: 88.2 % 17 15 2 6 1 8 2 6
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * nbtxlog.c
       4                 :  *    WAL replay logic for btrees.
       5                 :  *
       6                 :  *
       7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       8                 :  * Portions Copyright (c) 1994, Regents of the University of California
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/access/nbtree/nbtxlog.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "access/bufmask.h"
      18                 : #include "access/nbtree.h"
      19                 : #include "access/nbtxlog.h"
      20                 : #include "access/transam.h"
      21                 : #include "access/xlog.h"
      22                 : #include "access/xlogutils.h"
      23                 : #include "miscadmin.h"
      24                 : #include "storage/procarray.h"
      25                 : #include "utils/memutils.h"
      26                 : 
      27                 : static MemoryContext opCtx;     /* working memory for operations */
      28                 : 
      29                 : /*
      30                 :  * _bt_restore_page -- re-enter all the index tuples on a page
      31                 :  *
      32                 :  * The page is freshly init'd, and *from (length len) is a copy of what
      33                 :  * had been its upper part (pd_upper to pd_special).  We assume that the
      34                 :  * tuples had been added to the page in item-number order, and therefore
      35                 :  * the one with highest item number appears first (lowest on the page).
      36                 :  */
      37                 : static void
      38 CBC        1435 : _bt_restore_page(Page page, char *from, int len)
      39                 : {
      40                 :     IndexTupleData itupdata;
      41                 :     Size        itemsz;
      42            1435 :     char       *end = from + len;
      43                 :     Item        items[MaxIndexTuplesPerPage];
      44                 :     uint16      itemsizes[MaxIndexTuplesPerPage];
      45                 :     int         i;
      46                 :     int         nitems;
      47                 : 
      48                 :     /*
      49                 :      * To get the items back in the original order, we add them to the page in
      50                 :      * reverse.  To figure out where one tuple ends and another begins, we
      51                 :      * have to scan them in forward order first.
      52                 :      */
      53            1435 :     i = 0;
      54           92524 :     while (from < end)
      55                 :     {
      56                 :         /*
      57                 :          * As we step through the items, 'from' won't always be properly
      58                 :          * aligned, so we need to use memcpy().  Further, we use Item (which
      59                 :          * is just a char*) here for our items array for the same reason;
      60                 :          * wouldn't want the compiler or anyone thinking that an item is
      61                 :          * aligned when it isn't.
      62                 :          */
      63           91089 :         memcpy(&itupdata, from, sizeof(IndexTupleData));
      64           91089 :         itemsz = IndexTupleSize(&itupdata);
      65           91089 :         itemsz = MAXALIGN(itemsz);
      66                 : 
      67           91089 :         items[i] = (Item) from;
      68           91089 :         itemsizes[i] = itemsz;
      69           91089 :         i++;
      70                 : 
      71           91089 :         from += itemsz;
      72                 :     }
      73            1435 :     nitems = i;
      74                 : 
      75           92524 :     for (i = nitems - 1; i >= 0; i--)
      76                 :     {
      77           91089 :         if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
      78                 :                         false, false) == InvalidOffsetNumber)
      79 UBC           0 :             elog(PANIC, "_bt_restore_page: cannot add item to page");
      80                 :     }
      81 CBC        1435 : }
      82                 : 
      83                 : static void
      84             598 : _bt_restore_meta(XLogReaderState *record, uint8 block_id)
      85                 : {
      86             598 :     XLogRecPtr  lsn = record->EndRecPtr;
      87                 :     Buffer      metabuf;
      88                 :     Page        metapg;
      89                 :     BTMetaPageData *md;
      90                 :     BTPageOpaque pageop;
      91                 :     xl_btree_metadata *xlrec;
      92                 :     char       *ptr;
      93                 :     Size        len;
      94                 : 
      95             598 :     metabuf = XLogInitBufferForRedo(record, block_id);
      96             598 :     ptr = XLogRecGetBlockData(record, block_id, &len);
      97                 : 
      98             598 :     Assert(len == sizeof(xl_btree_metadata));
      99             598 :     Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
     100             598 :     xlrec = (xl_btree_metadata *) ptr;
     101             598 :     metapg = BufferGetPage(metabuf);
     102                 : 
     103             598 :     _bt_pageinit(metapg, BufferGetPageSize(metabuf));
     104                 : 
     105             598 :     md = BTPageGetMeta(metapg);
     106             598 :     md->btm_magic = BTREE_MAGIC;
     107             598 :     md->btm_version = xlrec->version;
     108             598 :     md->btm_root = xlrec->root;
     109             598 :     md->btm_level = xlrec->level;
     110             598 :     md->btm_fastroot = xlrec->fastroot;
     111             598 :     md->btm_fastlevel = xlrec->fastlevel;
     112                 :     /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
     113             598 :     Assert(md->btm_version >= BTREE_NOVAC_VERSION);
     114             598 :     md->btm_last_cleanup_num_delpages = xlrec->last_cleanup_num_delpages;
     115             598 :     md->btm_last_cleanup_num_heap_tuples = -1.0;
     116             598 :     md->btm_allequalimage = xlrec->allequalimage;
     117                 : 
     118             598 :     pageop = BTPageGetOpaque(metapg);
     119             598 :     pageop->btpo_flags = BTP_META;
     120                 : 
     121                 :     /*
     122                 :      * Set pd_lower just past the end of the metadata.  This is essential,
     123                 :      * because without doing so, metadata will be lost if xlog.c compresses
     124                 :      * the page.
     125                 :      */
     126             598 :     ((PageHeader) metapg)->pd_lower =
     127             598 :         ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
     128                 : 
     129             598 :     PageSetLSN(metapg, lsn);
     130             598 :     MarkBufferDirty(metabuf);
     131             598 :     UnlockReleaseBuffer(metabuf);
     132             598 : }
     133                 : 
     134                 : /*
     135                 :  * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
     136                 :  *
     137                 :  * This is a common subroutine of the redo functions of all the WAL record
     138                 :  * types that can insert a downlink: insert, split, and newroot.
     139                 :  */
     140                 : static void
     141            1383 : _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
     142                 : {
     143            1383 :     XLogRecPtr  lsn = record->EndRecPtr;
     144                 :     Buffer      buf;
     145                 : 
     146            1383 :     if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
     147                 :     {
     148            1383 :         Page        page = (Page) BufferGetPage(buf);
     149            1383 :         BTPageOpaque pageop = BTPageGetOpaque(page);
     150                 : 
     151            1383 :         Assert(P_INCOMPLETE_SPLIT(pageop));
     152            1383 :         pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
     153                 : 
     154            1383 :         PageSetLSN(page, lsn);
     155            1383 :         MarkBufferDirty(buf);
     156                 :     }
     157            1383 :     if (BufferIsValid(buf))
     158            1383 :         UnlockReleaseBuffer(buf);
     159            1383 : }
     160                 : 
     161                 : static void
     162          466915 : btree_xlog_insert(bool isleaf, bool ismeta, bool posting,
     163                 :                   XLogReaderState *record)
     164                 : {
     165          466915 :     XLogRecPtr  lsn = record->EndRecPtr;
     166          466915 :     xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
     167                 :     Buffer      buffer;
     168                 :     Page        page;
     169                 : 
     170                 :     /*
     171                 :      * Insertion to an internal page finishes an incomplete split at the child
     172                 :      * level.  Clear the incomplete-split flag in the child.  Note: during
     173                 :      * normal operation, the child and parent pages are locked at the same
     174                 :      * time (the locks are coupled), so that clearing the flag and inserting
     175                 :      * the downlink appear atomic to other backends.  We don't bother with
     176                 :      * that during replay, because readers don't care about the
     177                 :      * incomplete-split flag and there cannot be updates happening.
     178                 :      */
     179          466915 :     if (!isleaf)
     180            1280 :         _bt_clear_incomplete_split(record, 1);
     181          466915 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     182                 :     {
     183                 :         Size        datalen;
     184          464549 :         char       *datapos = XLogRecGetBlockData(record, 0, &datalen);
     185                 : 
     186          464549 :         page = BufferGetPage(buffer);
     187                 : 
     188          464549 :         if (!posting)
     189                 :         {
     190                 :             /* Simple retail insertion */
     191          462525 :             if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
     192                 :                             false, false) == InvalidOffsetNumber)
     193 UBC           0 :                 elog(PANIC, "failed to add new item");
     194                 :         }
     195                 :         else
     196                 :         {
     197                 :             ItemId      itemid;
     198                 :             IndexTuple  oposting,
     199                 :                         newitem,
     200                 :                         nposting;
     201                 :             uint16      postingoff;
     202                 : 
     203                 :             /*
     204                 :              * A posting list split occurred during leaf page insertion.  WAL
     205                 :              * record data will start with an offset number representing the
     206                 :              * point in an existing posting list that a split occurs at.
     207                 :              *
     208                 :              * Use _bt_swap_posting() to repeat posting list split steps from
     209                 :              * primary.  Note that newitem from WAL record is 'orignewitem',
     210                 :              * not the final version of newitem that is actually inserted on
     211                 :              * page.
     212                 :              */
     213 CBC        2024 :             postingoff = *((uint16 *) datapos);
     214            2024 :             datapos += sizeof(uint16);
     215            2024 :             datalen -= sizeof(uint16);
     216                 : 
     217            2024 :             itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum));
     218            2024 :             oposting = (IndexTuple) PageGetItem(page, itemid);
     219                 : 
     220                 :             /* Use mutable, aligned newitem copy in _bt_swap_posting() */
     221            2024 :             Assert(isleaf && postingoff > 0);
     222            2024 :             newitem = CopyIndexTuple((IndexTuple) datapos);
     223            2024 :             nposting = _bt_swap_posting(newitem, oposting, postingoff);
     224                 : 
     225                 :             /* Replace existing posting list with post-split version */
     226            2024 :             memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting)));
     227                 : 
     228                 :             /* Insert "final" new item (not orignewitem from WAL stream) */
     229            2024 :             Assert(IndexTupleSize(newitem) == datalen);
     230            2024 :             if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum,
     231                 :                             false, false) == InvalidOffsetNumber)
     232 UBC           0 :                 elog(PANIC, "failed to add posting split new item");
     233                 :         }
     234                 : 
     235 CBC      464549 :         PageSetLSN(page, lsn);
     236          464549 :         MarkBufferDirty(buffer);
     237                 :     }
     238          466915 :     if (BufferIsValid(buffer))
     239          466915 :         UnlockReleaseBuffer(buffer);
     240                 : 
     241                 :     /*
     242                 :      * Note: in normal operation, we'd update the metapage while still holding
     243                 :      * lock on the page we inserted into.  But during replay it's not
     244                 :      * necessary to hold that lock, since no other index updates can be
     245                 :      * happening concurrently, and readers will cope fine with following an
     246                 :      * obsolete link from the metapage.
     247                 :      */
     248          466915 :     if (ismeta)
     249               4 :         _bt_restore_meta(record, 2);
     250          466915 : }
     251                 : 
     252                 : static void
     253            1383 : btree_xlog_split(bool newitemonleft, XLogReaderState *record)
     254                 : {
     255            1383 :     XLogRecPtr  lsn = record->EndRecPtr;
     256            1383 :     xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
     257            1383 :     bool        isleaf = (xlrec->level == 0);
     258                 :     Buffer      buf;
     259                 :     Buffer      rbuf;
     260                 :     Page        rpage;
     261                 :     BTPageOpaque ropaque;
     262                 :     char       *datapos;
     263                 :     Size        datalen;
     264                 :     BlockNumber origpagenumber;
     265                 :     BlockNumber rightpagenumber;
     266                 :     BlockNumber spagenumber;
     267                 : 
     268            1383 :     XLogRecGetBlockTag(record, 0, NULL, NULL, &origpagenumber);
     269            1383 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &rightpagenumber);
     270            1383 :     if (!XLogRecGetBlockTagExtended(record, 2, NULL, NULL, &spagenumber, NULL))
     271             849 :         spagenumber = P_NONE;
     272                 : 
     273                 :     /*
     274                 :      * Clear the incomplete split flag on the appropriate child page one level
     275                 :      * down when origpage/buf is an internal page (there must have been
     276                 :      * cascading page splits during original execution in the event of an
     277                 :      * internal page split).  This is like the corresponding btree_xlog_insert
     278                 :      * call for internal pages.  We're not clearing the incomplete split flag
     279                 :      * for the current page split here (you can think of this as part of the
     280                 :      * insert of newitem that the page split action needs to perform in
     281                 :      * passing).
     282                 :      *
     283                 :      * Like in btree_xlog_insert, this can be done before locking other pages.
     284                 :      * We never need to couple cross-level locks in REDO routines.
     285                 :      */
     286            1383 :     if (!isleaf)
     287              51 :         _bt_clear_incomplete_split(record, 3);
     288                 : 
     289                 :     /* Reconstruct right (new) sibling page from scratch */
     290            1383 :     rbuf = XLogInitBufferForRedo(record, 1);
     291            1383 :     datapos = XLogRecGetBlockData(record, 1, &datalen);
     292            1383 :     rpage = (Page) BufferGetPage(rbuf);
     293                 : 
     294            1383 :     _bt_pageinit(rpage, BufferGetPageSize(rbuf));
     295            1383 :     ropaque = BTPageGetOpaque(rpage);
     296                 : 
     297            1383 :     ropaque->btpo_prev = origpagenumber;
     298            1383 :     ropaque->btpo_next = spagenumber;
     299            1383 :     ropaque->btpo_level = xlrec->level;
     300            1383 :     ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
     301            1383 :     ropaque->btpo_cycleid = 0;
     302                 : 
     303            1383 :     _bt_restore_page(rpage, datapos, datalen);
     304                 : 
     305            1383 :     PageSetLSN(rpage, lsn);
     306            1383 :     MarkBufferDirty(rbuf);
     307                 : 
     308                 :     /* Now reconstruct original page (left half of split) */
     309            1383 :     if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
     310                 :     {
     311                 :         /*
     312                 :          * To retain the same physical order of the tuples that they had, we
     313                 :          * initialize a temporary empty page for the left page and add all the
     314                 :          * items to that in item number order.  This mirrors how _bt_split()
     315                 :          * works.  Retaining the same physical order makes WAL consistency
     316                 :          * checking possible.  See also _bt_restore_page(), which does the
     317                 :          * same for the right page.
     318                 :          */
     319            1376 :         Page        origpage = (Page) BufferGetPage(buf);
     320            1376 :         BTPageOpaque oopaque = BTPageGetOpaque(origpage);
     321                 :         OffsetNumber off;
     322            1376 :         IndexTuple  newitem = NULL,
     323            1376 :                     left_hikey = NULL,
     324            1376 :                     nposting = NULL;
     325            1376 :         Size        newitemsz = 0,
     326            1376 :                     left_hikeysz = 0;
     327                 :         Page        leftpage;
     328                 :         OffsetNumber leftoff,
     329            1376 :                     replacepostingoff = InvalidOffsetNumber;
     330                 : 
     331            1376 :         datapos = XLogRecGetBlockData(record, 0, &datalen);
     332                 : 
     333            1376 :         if (newitemonleft || xlrec->postingoff != 0)
     334                 :         {
     335             167 :             newitem = (IndexTuple) datapos;
     336             167 :             newitemsz = MAXALIGN(IndexTupleSize(newitem));
     337             167 :             datapos += newitemsz;
     338             167 :             datalen -= newitemsz;
     339                 : 
     340             167 :             if (xlrec->postingoff != 0)
     341                 :             {
     342                 :                 ItemId      itemid;
     343                 :                 IndexTuple  oposting;
     344                 : 
     345                 :                 /* Posting list must be at offset number before new item's */
     346               4 :                 replacepostingoff = OffsetNumberPrev(xlrec->newitemoff);
     347                 : 
     348                 :                 /* Use mutable, aligned newitem copy in _bt_swap_posting() */
     349               4 :                 newitem = CopyIndexTuple(newitem);
     350               4 :                 itemid = PageGetItemId(origpage, replacepostingoff);
     351               4 :                 oposting = (IndexTuple) PageGetItem(origpage, itemid);
     352               4 :                 nposting = _bt_swap_posting(newitem, oposting,
     353               4 :                                             xlrec->postingoff);
     354                 :             }
     355                 :         }
     356                 : 
     357                 :         /*
     358                 :          * Extract left hikey and its size.  We assume that 16-bit alignment
     359                 :          * is enough to apply IndexTupleSize (since it's fetching from a
     360                 :          * uint16 field).
     361                 :          */
     362            1376 :         left_hikey = (IndexTuple) datapos;
     363            1376 :         left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
     364            1376 :         datapos += left_hikeysz;
     365            1376 :         datalen -= left_hikeysz;
     366                 : 
     367            1376 :         Assert(datalen == 0);
     368                 : 
     369            1376 :         leftpage = PageGetTempPageCopySpecial(origpage);
     370                 : 
     371                 :         /* Add high key tuple from WAL record to temp page */
     372            1376 :         leftoff = P_HIKEY;
     373            1376 :         if (PageAddItem(leftpage, (Item) left_hikey, left_hikeysz, P_HIKEY,
     374                 :                         false, false) == InvalidOffsetNumber)
     375 UBC           0 :             elog(ERROR, "failed to add high key to left page after split");
     376 CBC        1376 :         leftoff = OffsetNumberNext(leftoff);
     377                 : 
     378          306572 :         for (off = P_FIRSTDATAKEY(oopaque); off < xlrec->firstrightoff; off++)
     379                 :         {
     380                 :             ItemId      itemid;
     381                 :             Size        itemsz;
     382                 :             IndexTuple  item;
     383                 : 
     384                 :             /* Add replacement posting list when required */
     385          305196 :             if (off == replacepostingoff)
     386                 :             {
     387               4 :                 Assert(newitemonleft ||
     388                 :                        xlrec->firstrightoff == xlrec->newitemoff);
     389               4 :                 if (PageAddItem(leftpage, (Item) nposting,
     390                 :                                 MAXALIGN(IndexTupleSize(nposting)), leftoff,
     391                 :                                 false, false) == InvalidOffsetNumber)
     392 UBC           0 :                     elog(ERROR, "failed to add new posting list item to left page after split");
     393 CBC           4 :                 leftoff = OffsetNumberNext(leftoff);
     394               4 :                 continue;       /* don't insert oposting */
     395                 :             }
     396                 : 
     397                 :             /* add the new item if it was inserted on left page */
     398          305192 :             else if (newitemonleft && off == xlrec->newitemoff)
     399                 :             {
     400             144 :                 if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
     401                 :                                 false, false) == InvalidOffsetNumber)
     402 UBC           0 :                     elog(ERROR, "failed to add new item to left page after split");
     403 CBC         144 :                 leftoff = OffsetNumberNext(leftoff);
     404                 :             }
     405                 : 
     406          305192 :             itemid = PageGetItemId(origpage, off);
     407          305192 :             itemsz = ItemIdGetLength(itemid);
     408          305192 :             item = (IndexTuple) PageGetItem(origpage, itemid);
     409          305192 :             if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
     410                 :                             false, false) == InvalidOffsetNumber)
     411 UBC           0 :                 elog(ERROR, "failed to add old item to left page after split");
     412 CBC      305192 :             leftoff = OffsetNumberNext(leftoff);
     413                 :         }
     414                 : 
     415                 :         /* cope with possibility that newitem goes at the end */
     416            1376 :         if (newitemonleft && off == xlrec->newitemoff)
     417                 :         {
     418              22 :             if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
     419                 :                             false, false) == InvalidOffsetNumber)
     420 UBC           0 :                 elog(ERROR, "failed to add new item to left page after split");
     421 CBC          22 :             leftoff = OffsetNumberNext(leftoff);
     422                 :         }
     423                 : 
     424            1376 :         PageRestoreTempPage(leftpage, origpage);
     425                 : 
     426                 :         /* Fix opaque fields */
     427            1376 :         oopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
     428            1376 :         if (isleaf)
     429            1325 :             oopaque->btpo_flags |= BTP_LEAF;
     430            1376 :         oopaque->btpo_next = rightpagenumber;
     431            1376 :         oopaque->btpo_cycleid = 0;
     432                 : 
     433            1376 :         PageSetLSN(origpage, lsn);
     434            1376 :         MarkBufferDirty(buf);
     435                 :     }
     436                 : 
     437                 :     /* Fix left-link of the page to the right of the new right sibling */
     438            1383 :     if (spagenumber != P_NONE)
     439                 :     {
     440                 :         Buffer      sbuf;
     441                 : 
     442             534 :         if (XLogReadBufferForRedo(record, 2, &sbuf) == BLK_NEEDS_REDO)
     443                 :         {
     444             476 :             Page        spage = (Page) BufferGetPage(sbuf);
     445             476 :             BTPageOpaque spageop = BTPageGetOpaque(spage);
     446                 : 
     447             476 :             spageop->btpo_prev = rightpagenumber;
     448                 : 
     449             476 :             PageSetLSN(spage, lsn);
     450             476 :             MarkBufferDirty(sbuf);
     451                 :         }
     452             534 :         if (BufferIsValid(sbuf))
     453             534 :             UnlockReleaseBuffer(sbuf);
     454                 :     }
     455                 : 
     456                 :     /*
     457                 :      * Finally, release the remaining buffers.  sbuf, rbuf, and buf must be
     458                 :      * released together, so that readers cannot observe inconsistencies.
     459                 :      */
     460            1383 :     UnlockReleaseBuffer(rbuf);
     461            1383 :     if (BufferIsValid(buf))
     462            1383 :         UnlockReleaseBuffer(buf);
     463            1383 : }
     464                 : 
     465                 : static void
     466            2010 : btree_xlog_dedup(XLogReaderState *record)
     467                 : {
     468            2010 :     XLogRecPtr  lsn = record->EndRecPtr;
     469            2010 :     xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record);
     470                 :     Buffer      buf;
     471                 : 
     472            2010 :     if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
     473                 :     {
     474            2006 :         char       *ptr = XLogRecGetBlockData(record, 0, NULL);
     475            2006 :         Page        page = (Page) BufferGetPage(buf);
     476            2006 :         BTPageOpaque opaque = BTPageGetOpaque(page);
     477                 :         OffsetNumber offnum,
     478                 :                     minoff,
     479                 :                     maxoff;
     480                 :         BTDedupState state;
     481                 :         BTDedupInterval *intervals;
     482                 :         Page        newpage;
     483                 : 
     484            2006 :         state = (BTDedupState) palloc(sizeof(BTDedupStateData));
     485            2006 :         state->deduplicate = true;   /* unused */
     486            2006 :         state->nmaxitems = 0;    /* unused */
     487                 :         /* Conservatively use larger maxpostingsize than primary */
     488            2006 :         state->maxpostingsize = BTMaxItemSize(page);
     489            2006 :         state->base = NULL;
     490            2006 :         state->baseoff = InvalidOffsetNumber;
     491            2006 :         state->basetupsize = 0;
     492            2006 :         state->htids = palloc(state->maxpostingsize);
     493            2006 :         state->nhtids = 0;
     494            2006 :         state->nitems = 0;
     495            2006 :         state->phystupsize = 0;
     496            2006 :         state->nintervals = 0;
     497                 : 
     498            2006 :         minoff = P_FIRSTDATAKEY(opaque);
     499            2006 :         maxoff = PageGetMaxOffsetNumber(page);
     500            2006 :         newpage = PageGetTempPageCopySpecial(page);
     501                 : 
     502            2006 :         if (!P_RIGHTMOST(opaque))
     503                 :         {
     504            1726 :             ItemId      itemid = PageGetItemId(page, P_HIKEY);
     505            1726 :             Size        itemsz = ItemIdGetLength(itemid);
     506            1726 :             IndexTuple  item = (IndexTuple) PageGetItem(page, itemid);
     507                 : 
     508            1726 :             if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY,
     509                 :                             false, false) == InvalidOffsetNumber)
     510 UBC           0 :                 elog(ERROR, "deduplication failed to add highkey");
     511                 :         }
     512                 : 
     513 CBC        2006 :         intervals = (BTDedupInterval *) ptr;
     514            2006 :         for (offnum = minoff;
     515          462869 :              offnum <= maxoff;
     516          460863 :              offnum = OffsetNumberNext(offnum))
     517                 :         {
     518          460863 :             ItemId      itemid = PageGetItemId(page, offnum);
     519          460863 :             IndexTuple  itup = (IndexTuple) PageGetItem(page, itemid);
     520                 : 
     521          460863 :             if (offnum == minoff)
     522            2006 :                 _bt_dedup_start_pending(state, itup, offnum);
     523          458857 :             else if (state->nintervals < xlrec->nintervals &&
     524          344070 :                      state->baseoff == intervals[state->nintervals].baseoff &&
     525          123674 :                      state->nitems < intervals[state->nintervals].nitems)
     526                 :             {
     527           80914 :                 if (!_bt_dedup_save_htid(state, itup))
     528 UBC           0 :                     elog(ERROR, "deduplication failed to add heap tid to pending posting list");
     529                 :             }
     530                 :             else
     531                 :             {
     532 CBC      377943 :                 _bt_dedup_finish_pending(newpage, state);
     533          377943 :                 _bt_dedup_start_pending(state, itup, offnum);
     534                 :             }
     535                 :         }
     536                 : 
     537            2006 :         _bt_dedup_finish_pending(newpage, state);
     538            2006 :         Assert(state->nintervals == xlrec->nintervals);
     539            2006 :         Assert(memcmp(state->intervals, intervals,
     540                 :                       state->nintervals * sizeof(BTDedupInterval)) == 0);
     541                 : 
     542            2006 :         if (P_HAS_GARBAGE(opaque))
     543                 :         {
     544 UBC           0 :             BTPageOpaque nopaque = BTPageGetOpaque(newpage);
     545                 : 
     546               0 :             nopaque->btpo_flags &= ~BTP_HAS_GARBAGE;
     547                 :         }
     548                 : 
     549 CBC        2006 :         PageRestoreTempPage(newpage, page);
     550            2006 :         PageSetLSN(page, lsn);
     551            2006 :         MarkBufferDirty(buf);
     552                 :     }
     553                 : 
     554            2010 :     if (BufferIsValid(buf))
     555            2010 :         UnlockReleaseBuffer(buf);
     556            2010 : }
     557                 : 
     558                 : static void
     559             124 : btree_xlog_updates(Page page, OffsetNumber *updatedoffsets,
     560                 :                    xl_btree_update *updates, int nupdated)
     561                 : {
     562                 :     BTVacuumPosting vacposting;
     563                 :     IndexTuple  origtuple;
     564                 :     ItemId      itemid;
     565                 :     Size        itemsz;
     566                 : 
     567            6673 :     for (int i = 0; i < nupdated; i++)
     568                 :     {
     569            6549 :         itemid = PageGetItemId(page, updatedoffsets[i]);
     570            6549 :         origtuple = (IndexTuple) PageGetItem(page, itemid);
     571                 : 
     572            6549 :         vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) +
     573            6549 :                             updates->ndeletedtids * sizeof(uint16));
     574            6549 :         vacposting->updatedoffset = updatedoffsets[i];
     575            6549 :         vacposting->itup = origtuple;
     576            6549 :         vacposting->ndeletedtids = updates->ndeletedtids;
     577            6549 :         memcpy(vacposting->deletetids,
     578                 :                (char *) updates + SizeOfBtreeUpdate,
     579            6549 :                updates->ndeletedtids * sizeof(uint16));
     580                 : 
     581            6549 :         _bt_update_posting(vacposting);
     582                 : 
     583                 :         /* Overwrite updated version of tuple */
     584            6549 :         itemsz = MAXALIGN(IndexTupleSize(vacposting->itup));
     585            6549 :         if (!PageIndexTupleOverwrite(page, updatedoffsets[i],
     586            6549 :                                      (Item) vacposting->itup, itemsz))
     587 UBC           0 :             elog(PANIC, "failed to update partially dead item");
     588                 : 
     589 CBC        6549 :         pfree(vacposting->itup);
     590            6549 :         pfree(vacposting);
     591                 : 
     592                 :         /* advance to next xl_btree_update from array */
     593            6549 :         updates = (xl_btree_update *)
     594            6549 :             ((char *) updates + SizeOfBtreeUpdate +
     595            6549 :              updates->ndeletedtids * sizeof(uint16));
     596                 :     }
     597             124 : }
     598                 : 
     599                 : static void
     600             916 : btree_xlog_vacuum(XLogReaderState *record)
     601                 : {
     602             916 :     XLogRecPtr  lsn = record->EndRecPtr;
     603             916 :     xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
     604                 :     Buffer      buffer;
     605                 :     Page        page;
     606                 :     BTPageOpaque opaque;
     607                 : 
     608                 :     /*
     609                 :      * We need to take a cleanup lock here, just like btvacuumpage(). However,
     610                 :      * it isn't necessary to exhaustively get a cleanup lock on every block in
     611                 :      * the index during recovery (just getting a cleanup lock on pages with
     612                 :      * items to kill suffices).  See nbtree/README for details.
     613                 :      */
     614             916 :     if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
     615                 :         == BLK_NEEDS_REDO)
     616                 :     {
     617             791 :         char       *ptr = XLogRecGetBlockData(record, 0, NULL);
     618                 : 
     619             791 :         page = (Page) BufferGetPage(buffer);
     620                 : 
     621             791 :         if (xlrec->nupdated > 0)
     622                 :         {
     623                 :             OffsetNumber *updatedoffsets;
     624                 :             xl_btree_update *updates;
     625                 : 
     626              25 :             updatedoffsets = (OffsetNumber *)
     627              25 :                 (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
     628              25 :             updates = (xl_btree_update *) ((char *) updatedoffsets +
     629              25 :                                            xlrec->nupdated *
     630                 :                                            sizeof(OffsetNumber));
     631                 : 
     632              25 :             btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
     633                 :         }
     634                 : 
     635             791 :         if (xlrec->ndeleted > 0)
     636             791 :             PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
     637                 : 
     638                 :         /*
     639                 :          * Mark the page as not containing any LP_DEAD items --- see comments
     640                 :          * in _bt_delitems_vacuum().
     641                 :          */
     642             791 :         opaque = BTPageGetOpaque(page);
     643             791 :         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
     644                 : 
     645             791 :         PageSetLSN(page, lsn);
     646             791 :         MarkBufferDirty(buffer);
     647                 :     }
     648             916 :     if (BufferIsValid(buffer))
     649             916 :         UnlockReleaseBuffer(buffer);
     650             916 : }
     651                 : 
     652                 : static void
     653             645 : btree_xlog_delete(XLogReaderState *record)
     654                 : {
     655             645 :     XLogRecPtr  lsn = record->EndRecPtr;
     656             645 :     xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
     657                 :     Buffer      buffer;
     658                 :     Page        page;
     659                 :     BTPageOpaque opaque;
     660                 : 
     661                 :     /*
     662                 :      * If we have any conflict processing to do, it must happen before we
     663                 :      * update the page
     664                 :      */
     665             645 :     if (InHotStandby)
     666                 :     {
     667                 :         RelFileLocator rlocator;
     668                 : 
     669 GNC         643 :         XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
     670                 : 
     671             643 :         ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
     672             643 :                                             xlrec->isCatalogRel,
     673                 :                                             rlocator);
     674 ECB             :     }
     675                 : 
     676                 :     /*
     677                 :      * We don't need to take a cleanup lock to apply these changes. See
     678                 :      * nbtree/README for details.
     679                 :      */
     680 GIC         645 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     681                 :     {
     682 CBC         645 :         char       *ptr = XLogRecGetBlockData(record, 0, NULL);
     683                 : 
     684             645 :         page = (Page) BufferGetPage(buffer);
     685                 : 
     686             645 :         if (xlrec->nupdated > 0)
     687                 :         {
     688 ECB             :             OffsetNumber *updatedoffsets;
     689                 :             xl_btree_update *updates;
     690                 : 
     691 GIC          99 :             updatedoffsets = (OffsetNumber *)
     692              99 :                 (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
     693 CBC          99 :             updates = (xl_btree_update *) ((char *) updatedoffsets +
     694              99 :                                            xlrec->nupdated *
     695 ECB             :                                            sizeof(OffsetNumber));
     696                 : 
     697 GIC          99 :             btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
     698                 :         }
     699 ECB             : 
     700 GIC         645 :         if (xlrec->ndeleted > 0)
     701             616 :             PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
     702 ECB             : 
     703                 :         /* Mark the page as not containing any LP_DEAD items */
     704 GIC         645 :         opaque = BTPageGetOpaque(page);
     705             645 :         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
     706 ECB             : 
     707 CBC         645 :         PageSetLSN(page, lsn);
     708 GIC         645 :         MarkBufferDirty(buffer);
     709 ECB             :     }
     710 CBC         645 :     if (BufferIsValid(buffer))
     711 GIC         645 :         UnlockReleaseBuffer(buffer);
     712 CBC         645 : }
     713 ECB             : 
     714                 : static void
     715 GIC         618 : btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
     716                 : {
     717 CBC         618 :     XLogRecPtr  lsn = record->EndRecPtr;
     718 GIC         618 :     xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
     719 ECB             :     Buffer      buffer;
     720                 :     Page        page;
     721                 :     BTPageOpaque pageop;
     722                 :     IndexTupleData trunctuple;
     723                 : 
     724                 :     /*
     725                 :      * In normal operation, we would lock all the pages this WAL record
     726                 :      * touches before changing any of them.  In WAL replay, it should be okay
     727                 :      * to lock just one page at a time, since no concurrent index updates can
     728                 :      * be happening, and readers should not care whether they arrive at the
     729                 :      * target page or not (since it's surely empty).
     730                 :      */
     731                 : 
     732                 :     /* to-be-deleted subtree's parent page */
     733 GIC         618 :     if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     734                 :     {
     735 ECB             :         OffsetNumber poffset;
     736                 :         ItemId      itemid;
     737                 :         IndexTuple  itup;
     738                 :         OffsetNumber nextoffset;
     739                 :         BlockNumber rightsib;
     740                 : 
     741 GIC         617 :         page = (Page) BufferGetPage(buffer);
     742             617 :         pageop = BTPageGetOpaque(page);
     743 ECB             : 
     744 CBC         617 :         poffset = xlrec->poffset;
     745                 : 
     746             617 :         nextoffset = OffsetNumberNext(poffset);
     747 GIC         617 :         itemid = PageGetItemId(page, nextoffset);
     748 CBC         617 :         itup = (IndexTuple) PageGetItem(page, itemid);
     749             617 :         rightsib = BTreeTupleGetDownLink(itup);
     750 ECB             : 
     751 CBC         617 :         itemid = PageGetItemId(page, poffset);
     752 GIC         617 :         itup = (IndexTuple) PageGetItem(page, itemid);
     753 CBC         617 :         BTreeTupleSetDownLink(itup, rightsib);
     754             617 :         nextoffset = OffsetNumberNext(poffset);
     755             617 :         PageIndexTupleDelete(page, nextoffset);
     756 ECB             : 
     757 CBC         617 :         PageSetLSN(page, lsn);
     758 GIC         617 :         MarkBufferDirty(buffer);
     759 ECB             :     }
     760                 : 
     761                 :     /*
     762                 :      * Don't need to couple cross-level locks in REDO routines, so release
     763                 :      * lock on internal page immediately
     764                 :      */
     765 GIC         618 :     if (BufferIsValid(buffer))
     766             618 :         UnlockReleaseBuffer(buffer);
     767 ECB             : 
     768                 :     /* Rewrite the leaf page as a halfdead page */
     769 GIC         618 :     buffer = XLogInitBufferForRedo(record, 0);
     770             618 :     page = (Page) BufferGetPage(buffer);
     771 ECB             : 
     772 CBC         618 :     _bt_pageinit(page, BufferGetPageSize(buffer));
     773 GIC         618 :     pageop = BTPageGetOpaque(page);
     774 ECB             : 
     775 CBC         618 :     pageop->btpo_prev = xlrec->leftblk;
     776 GIC         618 :     pageop->btpo_next = xlrec->rightblk;
     777 CBC         618 :     pageop->btpo_level = 0;
     778             618 :     pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
     779             618 :     pageop->btpo_cycleid = 0;
     780 ECB             : 
     781                 :     /*
     782                 :      * Construct a dummy high key item that points to top parent page (value
     783                 :      * is InvalidBlockNumber when the top parent page is the leaf page itself)
     784                 :      */
     785 GIC         618 :     MemSet(&trunctuple, 0, sizeof(IndexTupleData));
     786             618 :     trunctuple.t_info = sizeof(IndexTupleData);
     787 CBC         618 :     BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
     788 ECB             : 
     789 CBC         618 :     if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
     790                 :                     false, false) == InvalidOffsetNumber)
     791 LBC           0 :         elog(ERROR, "could not add dummy high key to half-dead page");
     792                 : 
     793 GBC         618 :     PageSetLSN(page, lsn);
     794 GIC         618 :     MarkBufferDirty(buffer);
     795 CBC         618 :     UnlockReleaseBuffer(buffer);
     796             618 : }
     797 ECB             : 
     798                 : 
     799                 : static void
     800 GIC         665 : btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
     801                 : {
     802 CBC         665 :     XLogRecPtr  lsn = record->EndRecPtr;
     803 GIC         665 :     xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
     804 ECB             :     BlockNumber leftsib;
     805                 :     BlockNumber rightsib;
     806                 :     uint32      level;
     807                 :     bool        isleaf;
     808                 :     FullTransactionId safexid;
     809                 :     Buffer      leftbuf;
     810                 :     Buffer      target;
     811                 :     Buffer      rightbuf;
     812                 :     Page        page;
     813                 :     BTPageOpaque pageop;
     814                 : 
     815 GIC         665 :     leftsib = xlrec->leftsib;
     816             665 :     rightsib = xlrec->rightsib;
     817 CBC         665 :     level = xlrec->level;
     818             665 :     isleaf = (level == 0);
     819             665 :     safexid = xlrec->safexid;
     820 ECB             : 
     821                 :     /* No leaftopparent for level 0 (leaf page) or level 1 target */
     822 GIC         665 :     Assert(!BlockNumberIsValid(xlrec->leaftopparent) || level > 1);
     823                 : 
     824 ECB             :     /*
     825                 :      * In normal operation, we would lock all the pages this WAL record
     826                 :      * touches before changing any of them.  In WAL replay, we at least lock
     827                 :      * the pages in the same standard left-to-right order (leftsib, target,
     828                 :      * rightsib), and don't release the sibling locks until the target is
     829                 :      * marked deleted.
     830                 :      */
     831                 : 
     832                 :     /* Fix right-link of left sibling, if any */
     833 GIC         665 :     if (leftsib != P_NONE)
     834                 :     {
     835 CBC          53 :         if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
     836                 :         {
     837              53 :             page = (Page) BufferGetPage(leftbuf);
     838 GIC          53 :             pageop = BTPageGetOpaque(page);
     839 CBC          53 :             pageop->btpo_next = rightsib;
     840 ECB             : 
     841 CBC          53 :             PageSetLSN(page, lsn);
     842 GIC          53 :             MarkBufferDirty(leftbuf);
     843 ECB             :         }
     844                 :     }
     845                 :     else
     846 GIC         612 :         leftbuf = InvalidBuffer;
     847                 : 
     848 ECB             :     /* Rewrite target page as empty deleted page */
     849 GIC         665 :     target = XLogInitBufferForRedo(record, 0);
     850             665 :     page = (Page) BufferGetPage(target);
     851 ECB             : 
     852 CBC         665 :     _bt_pageinit(page, BufferGetPageSize(target));
     853 GIC         665 :     pageop = BTPageGetOpaque(page);
     854 ECB             : 
     855 CBC         665 :     pageop->btpo_prev = leftsib;
     856 GIC         665 :     pageop->btpo_next = rightsib;
     857 CBC         665 :     pageop->btpo_level = level;
     858             665 :     BTPageSetDeleted(page, safexid);
     859             665 :     if (isleaf)
     860             618 :         pageop->btpo_flags |= BTP_LEAF;
     861             665 :     pageop->btpo_cycleid = 0;
     862 ECB             : 
     863 CBC         665 :     PageSetLSN(page, lsn);
     864 GIC         665 :     MarkBufferDirty(target);
     865 ECB             : 
     866                 :     /* Fix left-link of right sibling */
     867 GIC         665 :     if (XLogReadBufferForRedo(record, 2, &rightbuf) == BLK_NEEDS_REDO)
     868                 :     {
     869 CBC         646 :         page = (Page) BufferGetPage(rightbuf);
     870 GIC         646 :         pageop = BTPageGetOpaque(page);
     871 CBC         646 :         pageop->btpo_prev = leftsib;
     872 ECB             : 
     873 CBC         646 :         PageSetLSN(page, lsn);
     874 GIC         646 :         MarkBufferDirty(rightbuf);
     875 ECB             :     }
     876                 : 
     877                 :     /* Release siblings */
     878 GIC         665 :     if (BufferIsValid(leftbuf))
     879              53 :         UnlockReleaseBuffer(leftbuf);
     880 CBC         665 :     if (BufferIsValid(rightbuf))
     881             665 :         UnlockReleaseBuffer(rightbuf);
     882 ECB             : 
     883                 :     /* Release target */
     884 GIC         665 :     UnlockReleaseBuffer(target);
     885                 : 
     886 ECB             :     /*
     887                 :      * If we deleted a parent of the targeted leaf page, instead of the leaf
     888                 :      * itself, update the leaf to point to the next remaining child in the
     889                 :      * to-be-deleted subtree
     890                 :      */
     891 GIC         665 :     if (XLogRecHasBlockRef(record, 3))
     892                 :     {
     893 ECB             :         /*
     894                 :          * There is no real data on the page, so we just re-create it from
     895                 :          * scratch using the information from the WAL record.
     896                 :          *
     897                 :          * Note that we don't end up here when the target page is also the
     898                 :          * leafbuf page.  There is no need to add a dummy hikey item with a
     899                 :          * top parent link when deleting leafbuf because it's the last page
     900                 :          * we'll delete in the subtree undergoing deletion.
     901                 :          */
     902                 :         Buffer      leafbuf;
     903                 :         IndexTupleData trunctuple;
     904                 : 
     905 GIC          47 :         Assert(!isleaf);
     906                 : 
     907 CBC          47 :         leafbuf = XLogInitBufferForRedo(record, 3);
     908 GIC          47 :         page = (Page) BufferGetPage(leafbuf);
     909 ECB             : 
     910 CBC          47 :         _bt_pageinit(page, BufferGetPageSize(leafbuf));
     911 GIC          47 :         pageop = BTPageGetOpaque(page);
     912 ECB             : 
     913 CBC          47 :         pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
     914 GIC          47 :         pageop->btpo_prev = xlrec->leafleftsib;
     915 CBC          47 :         pageop->btpo_next = xlrec->leafrightsib;
     916              47 :         pageop->btpo_level = 0;
     917              47 :         pageop->btpo_cycleid = 0;
     918 ECB             : 
     919                 :         /* Add a dummy hikey item */
     920 GIC          94 :         MemSet(&trunctuple, 0, sizeof(IndexTupleData));
     921              47 :         trunctuple.t_info = sizeof(IndexTupleData);
     922 CBC          47 :         BTreeTupleSetTopParent(&trunctuple, xlrec->leaftopparent);
     923 ECB             : 
     924 CBC          47 :         if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
     925                 :                         false, false) == InvalidOffsetNumber)
     926 LBC           0 :             elog(ERROR, "could not add dummy high key to half-dead page");
     927                 : 
     928 GBC          47 :         PageSetLSN(page, lsn);
     929 GIC          47 :         MarkBufferDirty(leafbuf);
     930 CBC          47 :         UnlockReleaseBuffer(leafbuf);
     931 ECB             :     }
     932                 : 
     933                 :     /* Update metapage if needed */
     934 GIC         665 :     if (info == XLOG_BTREE_UNLINK_PAGE_META)
     935               9 :         _bt_restore_meta(record, 4);
     936 CBC         665 : }
     937 ECB             : 
     938                 : static void
     939 GIC         573 : btree_xlog_newroot(XLogReaderState *record)
     940                 : {
     941 CBC         573 :     XLogRecPtr  lsn = record->EndRecPtr;
     942 GIC         573 :     xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
     943 ECB             :     Buffer      buffer;
     944                 :     Page        page;
     945                 :     BTPageOpaque pageop;
     946                 :     char       *ptr;
     947                 :     Size        len;
     948                 : 
     949 GIC         573 :     buffer = XLogInitBufferForRedo(record, 0);
     950             573 :     page = (Page) BufferGetPage(buffer);
     951 ECB             : 
     952 CBC         573 :     _bt_pageinit(page, BufferGetPageSize(buffer));
     953 GIC         573 :     pageop = BTPageGetOpaque(page);
     954 ECB             : 
     955 CBC         573 :     pageop->btpo_flags = BTP_ROOT;
     956 GIC         573 :     pageop->btpo_prev = pageop->btpo_next = P_NONE;
     957 CBC         573 :     pageop->btpo_level = xlrec->level;
     958             573 :     if (xlrec->level == 0)
     959             521 :         pageop->btpo_flags |= BTP_LEAF;
     960             573 :     pageop->btpo_cycleid = 0;
     961 ECB             : 
     962 CBC         573 :     if (xlrec->level > 0)
     963                 :     {
     964              52 :         ptr = XLogRecGetBlockData(record, 0, &len);
     965 GIC          52 :         _bt_restore_page(page, ptr, len);
     966 ECB             : 
     967                 :         /* Clear the incomplete-split flag in left child */
     968 GIC          52 :         _bt_clear_incomplete_split(record, 1);
     969                 :     }
     970 ECB             : 
     971 GIC         573 :     PageSetLSN(page, lsn);
     972             573 :     MarkBufferDirty(buffer);
     973 CBC         573 :     UnlockReleaseBuffer(buffer);
     974 ECB             : 
     975 CBC         573 :     _bt_restore_meta(record, 2);
     976 GIC         573 : }
     977 ECB             : 
     978                 : /*
     979                 :  * In general VACUUM must defer recycling as a way of avoiding certain race
     980                 :  * conditions.  Deleted pages contain a safexid value that is used by VACUUM
     981                 :  * to determine whether or not it's safe to place a page that was deleted by
     982                 :  * VACUUM earlier into the FSM now.  See nbtree/README.
     983                 :  *
     984                 :  * As far as any backend operating during original execution is concerned, the
     985                 :  * FSM is a cache of recycle-safe pages; the mere presence of the page in the
     986                 :  * FSM indicates that the page must already be safe to recycle (actually,
     987                 :  * _bt_getbuf() verifies it's safe using BTPageIsRecyclable(), but that's just
     988                 :  * because it would be unwise to completely trust the FSM, given its current
     989                 :  * limitations).
     990                 :  *
     991                 :  * This isn't sufficient to prevent similar concurrent recycling race
     992                 :  * conditions during Hot Standby, though.  For that we need to log a
     993                 :  * xl_btree_reuse_page record at the point that a page is actually recycled
     994                 :  * and reused for an entirely unrelated page inside _bt_split().  These
     995                 :  * records include the same safexid value from the original deleted page,
     996                 :  * stored in the record's snapshotConflictHorizon field.
     997                 :  *
     998                 :  * The GlobalVisCheckRemovableFullXid() test in BTPageIsRecyclable() is used
     999                 :  * to determine if it's safe to recycle a page.  This mirrors our own test:
    1000                 :  * the PGPROC->xmin > limitXmin test inside GetConflictingVirtualXIDs().
    1001                 :  * Consequently, one XID value achieves the same exclusion effect on primary
    1002                 :  * and standby.
    1003                 :  */
    1004                 : static void
    1005 UIC           0 : btree_xlog_reuse_page(XLogReaderState *record)
    1006                 : {
    1007 UBC           0 :     xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
    1008                 : 
    1009               0 :     if (InHotStandby)
    1010 UNC           0 :         ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
    1011               0 :                                                    xlrec->isCatalogRel,
    1012                 :                                                    xlrec->locator);
    1013 UBC           0 : }
    1014 EUB             : 
    1015                 : void
    1016 GBC      473737 : btree_redo(XLogReaderState *record)
    1017                 : {
    1018 GIC      473737 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    1019 ECB             :     MemoryContext oldCtx;
    1020                 : 
    1021 CBC      473737 :     oldCtx = MemoryContextSwitchTo(opCtx);
    1022 GIC      473737 :     switch (info)
    1023                 :     {
    1024 CBC      463604 :         case XLOG_BTREE_INSERT_LEAF:
    1025          463604 :             btree_xlog_insert(true, false, false, record);
    1026 GIC      463604 :             break;
    1027 CBC        1276 :         case XLOG_BTREE_INSERT_UPPER:
    1028            1276 :             btree_xlog_insert(false, false, false, record);
    1029            1276 :             break;
    1030               4 :         case XLOG_BTREE_INSERT_META:
    1031               4 :             btree_xlog_insert(false, true, false, record);
    1032               4 :             break;
    1033             170 :         case XLOG_BTREE_SPLIT_L:
    1034             170 :             btree_xlog_split(true, record);
    1035             170 :             break;
    1036            1213 :         case XLOG_BTREE_SPLIT_R:
    1037            1213 :             btree_xlog_split(false, record);
    1038            1213 :             break;
    1039            2031 :         case XLOG_BTREE_INSERT_POST:
    1040            2031 :             btree_xlog_insert(true, false, true, record);
    1041            2031 :             break;
    1042            2010 :         case XLOG_BTREE_DEDUP:
    1043            2010 :             btree_xlog_dedup(record);
    1044            2010 :             break;
    1045             916 :         case XLOG_BTREE_VACUUM:
    1046             916 :             btree_xlog_vacuum(record);
    1047             916 :             break;
    1048             645 :         case XLOG_BTREE_DELETE:
    1049             645 :             btree_xlog_delete(record);
    1050             645 :             break;
    1051             618 :         case XLOG_BTREE_MARK_PAGE_HALFDEAD:
    1052             618 :             btree_xlog_mark_page_halfdead(info, record);
    1053             618 :             break;
    1054             665 :         case XLOG_BTREE_UNLINK_PAGE:
    1055 ECB             :         case XLOG_BTREE_UNLINK_PAGE_META:
    1056 CBC         665 :             btree_xlog_unlink_page(info, record);
    1057             665 :             break;
    1058 GIC         573 :         case XLOG_BTREE_NEWROOT:
    1059 CBC         573 :             btree_xlog_newroot(record);
    1060             573 :             break;
    1061 LBC           0 :         case XLOG_BTREE_REUSE_PAGE:
    1062               0 :             btree_xlog_reuse_page(record);
    1063               0 :             break;
    1064 GBC          12 :         case XLOG_BTREE_META_CLEANUP:
    1065              12 :             _bt_restore_meta(record, 0);
    1066              12 :             break;
    1067 LBC           0 :         default:
    1068               0 :             elog(PANIC, "btree_redo: unknown op code %u", info);
    1069 ECB             :     }
    1070 GBC      473737 :     MemoryContextSwitchTo(oldCtx);
    1071          473737 :     MemoryContextReset(opCtx);
    1072 GIC      473737 : }
    1073 ECB             : 
    1074                 : void
    1075 CBC         141 : btree_xlog_startup(void)
    1076                 : {
    1077 GIC         141 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
    1078 ECB             :                                   "Btree recovery temporary context",
    1079                 :                                   ALLOCSET_DEFAULT_SIZES);
    1080 CBC         141 : }
    1081                 : 
    1082                 : void
    1083             108 : btree_xlog_cleanup(void)
    1084                 : {
    1085 GIC         108 :     MemoryContextDelete(opCtx);
    1086 CBC         108 :     opCtx = NULL;
    1087 GIC         108 : }
    1088 ECB             : 
    1089                 : /*
    1090                 :  * Mask a btree page before performing consistency checks on it.
    1091                 :  */
    1092                 : void
    1093 UIC           0 : btree_mask(char *pagedata, BlockNumber blkno)
    1094                 : {
    1095               0 :     Page        page = (Page) pagedata;
    1096 EUB             :     BTPageOpaque maskopaq;
    1097                 : 
    1098 UBC           0 :     mask_page_lsn_and_checksum(page);
    1099                 : 
    1100 UIC           0 :     mask_page_hint_bits(page);
    1101 UBC           0 :     mask_unused_space(page);
    1102                 : 
    1103               0 :     maskopaq = BTPageGetOpaque(page);
    1104 EUB             : 
    1105 UIC           0 :     if (P_ISLEAF(maskopaq))
    1106 EUB             :     {
    1107                 :         /*
    1108                 :          * In btree leaf pages, it is possible to modify the LP_FLAGS without
    1109                 :          * emitting any WAL record. Hence, mask the line pointer flags. See
    1110                 :          * _bt_killitems(), _bt_check_unique() for details.
    1111                 :          */
    1112 UIC           0 :         mask_lp_flags(page);
    1113                 :     }
    1114                 : 
    1115 EUB             :     /*
    1116                 :      * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
    1117                 :      * _bt_delete_or_dedup_one_page(), _bt_killitems(), and _bt_check_unique()
    1118                 :      * for details.
    1119                 :      */
    1120 UIC           0 :     maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
    1121                 : 
    1122                 :     /*
    1123 EUB             :      * During replay of a btree page split, we don't set the BTP_SPLIT_END
    1124                 :      * flag of the right sibling and initialize the cycle_id to 0 for the same
    1125                 :      * page. See btree_xlog_split() for details.
    1126                 :      */
    1127 UIC           0 :     maskopaq->btpo_flags &= ~BTP_SPLIT_END;
    1128               0 :     maskopaq->btpo_cycleid = 0;
    1129               0 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a