LCOV - differential code coverage report
Current view: top level - src/backend/access/nbtree - nbtxlog.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.5 % 505 467 2 7 10 19 8 76 3 380 10 78 1 2
Current Date: 2023-04-08 17:13:01 Functions: 88.2 % 17 15 2 6 1 8 2 6
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 50.0 % 2 1 1 1
Legend: Lines: hit not hit (120,180] days: 50.0 % 2 1 1 1
(240..) days: 92.8 % 501 465 7 10 19 8 76 1 380 7 78
Function coverage date bins:
(240..) days: 60.0 % 25 15 2 6 1 8 2 6

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * nbtxlog.c
                                  4                 :  *    WAL replay logic for btrees.
                                  5                 :  *
                                  6                 :  *
                                  7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  8                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *    src/backend/access/nbtree/nbtxlog.c
                                 12                 :  *
                                 13                 :  *-------------------------------------------------------------------------
                                 14                 :  */
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include "access/bufmask.h"
                                 18                 : #include "access/nbtree.h"
                                 19                 : #include "access/nbtxlog.h"
                                 20                 : #include "access/transam.h"
                                 21                 : #include "access/xlog.h"
                                 22                 : #include "access/xlogutils.h"
                                 23                 : #include "miscadmin.h"
                                 24                 : #include "storage/procarray.h"
                                 25                 : #include "utils/memutils.h"
                                 26                 : 
                                 27                 : static MemoryContext opCtx;     /* working memory for operations */
                                 28                 : 
                                 29                 : /*
                                 30                 :  * _bt_restore_page -- re-enter all the index tuples on a page
                                 31                 :  *
                                 32                 :  * The page is freshly init'd, and *from (length len) is a copy of what
                                 33                 :  * had been its upper part (pd_upper to pd_special).  We assume that the
                                 34                 :  * tuples had been added to the page in item-number order, and therefore
                                 35                 :  * the one with highest item number appears first (lowest on the page).
                                 36                 :  */
                                 37                 : static void
 7352 tgl                        38 CBC        1435 : _bt_restore_page(Page page, char *from, int len)
                                 39                 : {
                                 40                 :     IndexTupleData itupdata;
                                 41                 :     Size        itemsz;
                                 42            1435 :     char       *end = from + len;
                                 43                 :     Item        items[MaxIndexTuplesPerPage];
                                 44                 :     uint16      itemsizes[MaxIndexTuplesPerPage];
                                 45                 :     int         i;
                                 46                 :     int         nitems;
                                 47                 : 
                                 48                 :     /*
                                 49                 :      * To get the items back in the original order, we add them to the page in
                                 50                 :      * reverse.  To figure out where one tuple ends and another begins, we
                                 51                 :      * have to scan them in forward order first.
                                 52                 :      */
 3274 heikki.linnakangas         53            1435 :     i = 0;
                                 54           92524 :     while (from < end)
                                 55                 :     {
                                 56                 :         /*
                                 57                 :          * As we step through the items, 'from' won't always be properly
                                 58                 :          * aligned, so we need to use memcpy().  Further, we use Item (which
                                 59                 :          * is just a char*) here for our items array for the same reason;
                                 60                 :          * wouldn't want the compiler or anyone thinking that an item is
                                 61                 :          * aligned when it isn't.
                                 62                 :          */
 6283 tgl                        63           91089 :         memcpy(&itupdata, from, sizeof(IndexTupleData));
 1866                            64           91089 :         itemsz = IndexTupleSize(&itupdata);
 7352                            65           91089 :         itemsz = MAXALIGN(itemsz);
                                 66                 : 
 3274 heikki.linnakangas         67           91089 :         items[i] = (Item) from;
                                 68           91089 :         itemsizes[i] = itemsz;
                                 69           91089 :         i++;
                                 70                 : 
                                 71           91089 :         from += itemsz;
                                 72                 :     }
                                 73            1435 :     nitems = i;
                                 74                 : 
                                 75           92524 :     for (i = nitems - 1; i >= 0; i--)
                                 76                 :     {
                                 77           91089 :         if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
                                 78                 :                         false, false) == InvalidOffsetNumber)
 5911 bruce                      79 UBC           0 :             elog(PANIC, "_bt_restore_page: cannot add item to page");
                                 80                 :     }
 7352 tgl                        81 CBC        1435 : }
                                 82                 : 
                                 83                 : static void
 3062 heikki.linnakangas         84             598 : _bt_restore_meta(XLogReaderState *record, uint8 block_id)
                                 85                 : {
                                 86             598 :     XLogRecPtr  lsn = record->EndRecPtr;
                                 87                 :     Buffer      metabuf;
                                 88                 :     Page        metapg;
                                 89                 :     BTMetaPageData *md;
                                 90                 :     BTPageOpaque pageop;
                                 91                 :     xl_btree_metadata *xlrec;
                                 92                 :     char       *ptr;
                                 93                 :     Size        len;
                                 94                 : 
                                 95             598 :     metabuf = XLogInitBufferForRedo(record, block_id);
                                 96             598 :     ptr = XLogRecGetBlockData(record, block_id, &len);
                                 97                 : 
                                 98             598 :     Assert(len == sizeof(xl_btree_metadata));
                                 99             598 :     Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
                                100             598 :     xlrec = (xl_btree_metadata *) ptr;
 2545 kgrittn                   101             598 :     metapg = BufferGetPage(metabuf);
                                102                 : 
 7352 tgl                       103             598 :     _bt_pageinit(metapg, BufferGetPageSize(metabuf));
                                104                 : 
                                105             598 :     md = BTPageGetMeta(metapg);
 6885                           106             598 :     md->btm_magic = BTREE_MAGIC;
 1481 pg                        107             598 :     md->btm_version = xlrec->version;
 3062 heikki.linnakangas        108             598 :     md->btm_root = xlrec->root;
                                109             598 :     md->btm_level = xlrec->level;
                                110             598 :     md->btm_fastroot = xlrec->fastroot;
                                111             598 :     md->btm_fastlevel = xlrec->fastlevel;
                                112                 :     /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
 1361 pg                        113             598 :     Assert(md->btm_version >= BTREE_NOVAC_VERSION);
  774                           114             598 :     md->btm_last_cleanup_num_delpages = xlrec->last_cleanup_num_delpages;
  760                           115             598 :     md->btm_last_cleanup_num_heap_tuples = -1.0;
 1138                           116             598 :     md->btm_allequalimage = xlrec->allequalimage;
                                117                 : 
  373 michael                   118             598 :     pageop = BTPageGetOpaque(metapg);
 7352 tgl                       119             598 :     pageop->btpo_flags = BTP_META;
                                120                 : 
                                121                 :     /*
                                122                 :      * Set pd_lower just past the end of the metadata.  This is essential,
                                123                 :      * because without doing so, metadata will be lost if xlog.c compresses
                                124                 :      * the page.
                                125                 :      */
 6520                           126             598 :     ((PageHeader) metapg)->pd_lower =
                                127             598 :         ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
                                128                 : 
 7352                           129             598 :     PageSetLSN(metapg, lsn);
 6218                           130             598 :     MarkBufferDirty(metabuf);
                                131             598 :     UnlockReleaseBuffer(metabuf);
 7352                           132             598 : }
                                133                 : 
                                134                 : /*
                                135                 :  * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
                                136                 :  *
                                137                 :  * This is a common subroutine of the redo functions of all the WAL record
                                138                 :  * types that can insert a downlink: insert, split, and newroot.
                                139                 :  */
                                140                 : static void
 3062 heikki.linnakangas        141            1383 : _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
                                142                 : {
                                143            1383 :     XLogRecPtr  lsn = record->EndRecPtr;
                                144                 :     Buffer      buf;
                                145                 : 
                                146            1383 :     if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
                                147                 :     {
 2545 kgrittn                   148            1383 :         Page        page = (Page) BufferGetPage(buf);
  373 michael                   149            1383 :         BTPageOpaque pageop = BTPageGetOpaque(page);
                                150                 : 
 2029 tgl                       151            1383 :         Assert(P_INCOMPLETE_SPLIT(pageop));
 3161 heikki.linnakangas        152            1383 :         pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
                                153                 : 
                                154            1383 :         PageSetLSN(page, lsn);
                                155            1383 :         MarkBufferDirty(buf);
                                156                 :     }
                                157            1383 :     if (BufferIsValid(buf))
                                158            1383 :         UnlockReleaseBuffer(buf);
 3309                           159            1383 : }
                                160                 : 
                                161                 : static void
 1138 pg                        162          466915 : btree_xlog_insert(bool isleaf, bool ismeta, bool posting,
                                163                 :                   XLogReaderState *record)
                                164                 : {
 3062 heikki.linnakangas        165          466915 :     XLogRecPtr  lsn = record->EndRecPtr;
 7352 tgl                       166          466915 :     xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
                                167                 :     Buffer      buffer;
                                168                 :     Page        page;
                                169                 : 
                                170                 :     /*
                                171                 :      * Insertion to an internal page finishes an incomplete split at the child
                                172                 :      * level.  Clear the incomplete-split flag in the child.  Note: during
                                173                 :      * normal operation, the child and parent pages are locked at the same
                                174                 :      * time (the locks are coupled), so that clearing the flag and inserting
                                175                 :      * the downlink appear atomic to other backends.  We don't bother with
                                176                 :      * that during replay, because readers don't care about the
                                177                 :      * incomplete-split flag and there cannot be updates happening.
                                178                 :      */
 3309 heikki.linnakangas        179          466915 :     if (!isleaf)
 3062                           180            1280 :         _bt_clear_incomplete_split(record, 1);
                                181          466915 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
                                182                 :     {
                                183                 :         Size        datalen;
                                184          464549 :         char       *datapos = XLogRecGetBlockData(record, 0, &datalen);
                                185                 : 
 2545 kgrittn                   186          464549 :         page = BufferGetPage(buffer);
                                187                 : 
 1138 pg                        188          464549 :         if (!posting)
                                189                 :         {
                                190                 :             /* Simple retail insertion */
                                191          462525 :             if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
                                192                 :                             false, false) == InvalidOffsetNumber)
 1138 pg                        193 UBC           0 :                 elog(PANIC, "failed to add new item");
                                194                 :         }
                                195                 :         else
                                196                 :         {
                                197                 :             ItemId      itemid;
                                198                 :             IndexTuple  oposting,
                                199                 :                         newitem,
                                200                 :                         nposting;
                                201                 :             uint16      postingoff;
                                202                 : 
                                203                 :             /*
                                204                 :              * A posting list split occurred during leaf page insertion.  WAL
                                205                 :              * record data will start with an offset number representing the
                                206                 :              * point in an existing posting list that a split occurs at.
                                207                 :              *
                                208                 :              * Use _bt_swap_posting() to repeat posting list split steps from
                                209                 :              * primary.  Note that newitem from WAL record is 'orignewitem',
                                210                 :              * not the final version of newitem that is actually inserted on
                                211                 :              * page.
                                212                 :              */
 1138 pg                        213 CBC        2024 :             postingoff = *((uint16 *) datapos);
                                214            2024 :             datapos += sizeof(uint16);
                                215            2024 :             datalen -= sizeof(uint16);
                                216                 : 
                                217            2024 :             itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum));
                                218            2024 :             oposting = (IndexTuple) PageGetItem(page, itemid);
                                219                 : 
                                220                 :             /* Use mutable, aligned newitem copy in _bt_swap_posting() */
                                221            2024 :             Assert(isleaf && postingoff > 0);
                                222            2024 :             newitem = CopyIndexTuple((IndexTuple) datapos);
                                223            2024 :             nposting = _bt_swap_posting(newitem, oposting, postingoff);
                                224                 : 
                                225                 :             /* Replace existing posting list with post-split version */
                                226            2024 :             memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting)));
                                227                 : 
                                228                 :             /* Insert "final" new item (not orignewitem from WAL stream) */
                                229            2024 :             Assert(IndexTupleSize(newitem) == datalen);
                                230            2024 :             if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum,
                                231                 :                             false, false) == InvalidOffsetNumber)
 1138 pg                        232 UBC           0 :                 elog(PANIC, "failed to add posting split new item");
                                233                 :         }
                                234                 : 
 3161 heikki.linnakangas        235 CBC      464549 :         PageSetLSN(page, lsn);
                                236          464549 :         MarkBufferDirty(buffer);
                                237                 :     }
                                238          466915 :     if (BufferIsValid(buffer))
                                239          466915 :         UnlockReleaseBuffer(buffer);
                                240                 : 
                                241                 :     /*
                                242                 :      * Note: in normal operation, we'd update the metapage while still holding
                                243                 :      * lock on the page we inserted into.  But during replay it's not
                                244                 :      * necessary to hold that lock, since no other index updates can be
                                245                 :      * happening concurrently, and readers will cope fine with following an
                                246                 :      * obsolete link from the metapage.
                                247                 :      */
 6516 tgl                       248          466915 :     if (ismeta)
 3062 heikki.linnakangas        249               4 :         _bt_restore_meta(record, 2);
 7352 tgl                       250          466915 : }
                                251                 : 
                                252                 : static void
 1091 pg                        253            1383 : btree_xlog_split(bool newitemonleft, XLogReaderState *record)
                                254                 : {
 3062 heikki.linnakangas        255            1383 :     XLogRecPtr  lsn = record->EndRecPtr;
 7352 tgl                       256            1383 :     xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
 3309 heikki.linnakangas        257            1383 :     bool        isleaf = (xlrec->level == 0);
                                258                 :     Buffer      buf;
                                259                 :     Buffer      rbuf;
                                260                 :     Page        rpage;
                                261                 :     BTPageOpaque ropaque;
                                262                 :     char       *datapos;
                                263                 :     Size        datalen;
                                264                 :     BlockNumber origpagenumber;
                                265                 :     BlockNumber rightpagenumber;
                                266                 :     BlockNumber spagenumber;
                                267                 : 
  975 pg                        268            1383 :     XLogRecGetBlockTag(record, 0, NULL, NULL, &origpagenumber);
                                269            1383 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &rightpagenumber);
  363 tgl                       270            1383 :     if (!XLogRecGetBlockTagExtended(record, 2, NULL, NULL, &spagenumber, NULL))
  975 pg                        271             849 :         spagenumber = P_NONE;
                                272                 : 
                                273                 :     /*
                                274                 :      * Clear the incomplete split flag on the appropriate child page one level
                                275                 :      * down when origpage/buf is an internal page (there must have been
                                276                 :      * cascading page splits during original execution in the event of an
                                277                 :      * internal page split).  This is like the corresponding btree_xlog_insert
                                278                 :      * call for internal pages.  We're not clearing the incomplete split flag
                                279                 :      * for the current page split here (you can think of this as part of the
                                280                 :      * insert of newitem that the page split action needs to perform in
                                281                 :      * passing).
                                282                 :      *
                                283                 :      * Like in btree_xlog_insert, this can be done before locking other pages.
                                284                 :      * We never need to couple cross-level locks in REDO routines.
                                285                 :      */
 3309 heikki.linnakangas        286            1383 :     if (!isleaf)
 3062                           287              51 :         _bt_clear_incomplete_split(record, 3);
                                288                 : 
                                289                 :     /* Reconstruct right (new) sibling page from scratch */
                                290            1383 :     rbuf = XLogInitBufferForRedo(record, 1);
                                291            1383 :     datapos = XLogRecGetBlockData(record, 1, &datalen);
 2545 kgrittn                   292            1383 :     rpage = (Page) BufferGetPage(rbuf);
                                293                 : 
 5904 bruce                     294            1383 :     _bt_pageinit(rpage, BufferGetPageSize(rbuf));
  373 michael                   295            1383 :     ropaque = BTPageGetOpaque(rpage);
                                296                 : 
  975 pg                        297            1383 :     ropaque->btpo_prev = origpagenumber;
                                298            1383 :     ropaque->btpo_next = spagenumber;
  774                           299            1383 :     ropaque->btpo_level = xlrec->level;
 3309 heikki.linnakangas        300            1383 :     ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
 5904 bruce                     301            1383 :     ropaque->btpo_cycleid = 0;
                                302                 : 
                                303            1383 :     _bt_restore_page(rpage, datapos, datalen);
                                304                 : 
                                305            1383 :     PageSetLSN(rpage, lsn);
                                306            1383 :     MarkBufferDirty(rbuf);
                                307                 : 
                                308                 :     /* Now reconstruct original page (left half of split) */
  975 pg                        309            1383 :     if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
                                310                 :     {
                                311                 :         /*
                                312                 :          * To retain the same physical order of the tuples that they had, we
                                313                 :          * initialize a temporary empty page for the left page and add all the
                                314                 :          * items to that in item number order.  This mirrors how _bt_split()
                                315                 :          * works.  Retaining the same physical order makes WAL consistency
                                316                 :          * checking possible.  See also _bt_restore_page(), which does the
                                317                 :          * same for the right page.
                                318                 :          */
                                319            1376 :         Page        origpage = (Page) BufferGetPage(buf);
  373 michael                   320            1376 :         BTPageOpaque oopaque = BTPageGetOpaque(origpage);
                                321                 :         OffsetNumber off;
 1481 pg                        322            1376 :         IndexTuple  newitem = NULL,
 1138                           323            1376 :                     left_hikey = NULL,
                                324            1376 :                     nposting = NULL;
 1481                           325            1376 :         Size        newitemsz = 0,
                                326            1376 :                     left_hikeysz = 0;
                                327                 :         Page        leftpage;
                                328                 :         OffsetNumber leftoff,
 1138                           329            1376 :                     replacepostingoff = InvalidOffsetNumber;
                                330                 : 
 3062 heikki.linnakangas        331            1376 :         datapos = XLogRecGetBlockData(record, 0, &datalen);
                                332                 : 
 1091 pg                        333            1376 :         if (newitemonleft || xlrec->postingoff != 0)
                                334                 :         {
 1866 tgl                       335             167 :             newitem = (IndexTuple) datapos;
 3062 heikki.linnakangas        336             167 :             newitemsz = MAXALIGN(IndexTupleSize(newitem));
                                337             167 :             datapos += newitemsz;
                                338             167 :             datalen -= newitemsz;
                                339                 : 
 1138 pg                        340             167 :             if (xlrec->postingoff != 0)
                                341                 :             {
                                342                 :                 ItemId      itemid;
                                343                 :                 IndexTuple  oposting;
                                344                 : 
                                345                 :                 /* Posting list must be at offset number before new item's */
                                346               4 :                 replacepostingoff = OffsetNumberPrev(xlrec->newitemoff);
                                347                 : 
                                348                 :                 /* Use mutable, aligned newitem copy in _bt_swap_posting() */
                                349               4 :                 newitem = CopyIndexTuple(newitem);
  975                           350               4 :                 itemid = PageGetItemId(origpage, replacepostingoff);
                                351               4 :                 oposting = (IndexTuple) PageGetItem(origpage, itemid);
 1138                           352               4 :                 nposting = _bt_swap_posting(newitem, oposting,
                                353               4 :                                             xlrec->postingoff);
                                354                 :             }
                                355                 :         }
                                356                 : 
                                357                 :         /*
                                358                 :          * Extract left hikey and its size.  We assume that 16-bit alignment
                                359                 :          * is enough to apply IndexTupleSize (since it's fetching from a
                                360                 :          * uint16 field).
                                361                 :          */
 1481                           362            1376 :         left_hikey = (IndexTuple) datapos;
                                363            1376 :         left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
                                364            1376 :         datapos += left_hikeysz;
                                365            1376 :         datalen -= left_hikeysz;
                                366                 : 
 3062 heikki.linnakangas        367            1376 :         Assert(datalen == 0);
                                368                 : 
  975 pg                        369            1376 :         leftpage = PageGetTempPageCopySpecial(origpage);
                                370                 : 
                                371                 :         /* Add high key tuple from WAL record to temp page */
 3161 heikki.linnakangas        372            1376 :         leftoff = P_HIKEY;
  975 pg                        373            1376 :         if (PageAddItem(leftpage, (Item) left_hikey, left_hikeysz, P_HIKEY,
                                374                 :                         false, false) == InvalidOffsetNumber)
  975 pg                        375 UBC           0 :             elog(ERROR, "failed to add high key to left page after split");
 3161 heikki.linnakangas        376 CBC        1376 :         leftoff = OffsetNumberNext(leftoff);
                                377                 : 
  975 pg                        378          306572 :         for (off = P_FIRSTDATAKEY(oopaque); off < xlrec->firstrightoff; off++)
                                379                 :         {
                                380                 :             ItemId      itemid;
                                381                 :             Size        itemsz;
                                382                 :             IndexTuple  item;
                                383                 : 
                                384                 :             /* Add replacement posting list when required */
 1138                           385          305196 :             if (off == replacepostingoff)
                                386                 :             {
 1091                           387               4 :                 Assert(newitemonleft ||
                                388                 :                        xlrec->firstrightoff == xlrec->newitemoff);
  975                           389               4 :                 if (PageAddItem(leftpage, (Item) nposting,
                                390                 :                                 MAXALIGN(IndexTupleSize(nposting)), leftoff,
                                391                 :                                 false, false) == InvalidOffsetNumber)
 1138 pg                        392 UBC           0 :                     elog(ERROR, "failed to add new posting list item to left page after split");
 1138 pg                        393 CBC           4 :                 leftoff = OffsetNumberNext(leftoff);
                                394               4 :                 continue;       /* don't insert oposting */
                                395                 :             }
                                396                 : 
                                397                 :             /* add the new item if it was inserted on left page */
 1091                           398          305192 :             else if (newitemonleft && off == xlrec->newitemoff)
                                399                 :             {
  975                           400             144 :                 if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
                                401                 :                                 false, false) == InvalidOffsetNumber)
 3161 heikki.linnakangas        402 UBC           0 :                     elog(ERROR, "failed to add new item to left page after split");
 3274 heikki.linnakangas        403 CBC         144 :                 leftoff = OffsetNumberNext(leftoff);
                                404                 :             }
                                405                 : 
  975 pg                        406          305192 :             itemid = PageGetItemId(origpage, off);
 3161 heikki.linnakangas        407          305192 :             itemsz = ItemIdGetLength(itemid);
  975 pg                        408          305192 :             item = (IndexTuple) PageGetItem(origpage, itemid);
                                409          305192 :             if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
                                410                 :                             false, false) == InvalidOffsetNumber)
 3161 heikki.linnakangas        411 UBC           0 :                 elog(ERROR, "failed to add old item to left page after split");
 3161 heikki.linnakangas        412 CBC      305192 :             leftoff = OffsetNumberNext(leftoff);
                                413                 :         }
                                414                 : 
                                415                 :         /* cope with possibility that newitem goes at the end */
 1091 pg                        416            1376 :         if (newitemonleft && off == xlrec->newitemoff)
                                417                 :         {
  975                           418              22 :             if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
                                419                 :                             false, false) == InvalidOffsetNumber)
 3161 heikki.linnakangas        420 UBC           0 :                 elog(ERROR, "failed to add new item to left page after split");
 3161 heikki.linnakangas        421 CBC          22 :             leftoff = OffsetNumberNext(leftoff);
                                422                 :         }
                                423                 : 
  975 pg                        424            1376 :         PageRestoreTempPage(leftpage, origpage);
                                425                 : 
                                426                 :         /* Fix opaque fields */
                                427            1376 :         oopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
 3161 heikki.linnakangas        428            1376 :         if (isleaf)
  975 pg                        429            1325 :             oopaque->btpo_flags |= BTP_LEAF;
                                430            1376 :         oopaque->btpo_next = rightpagenumber;
                                431            1376 :         oopaque->btpo_cycleid = 0;
                                432                 : 
                                433            1376 :         PageSetLSN(origpage, lsn);
                                434            1376 :         MarkBufferDirty(buf);
                                435                 :     }
                                436                 : 
                                437                 :     /* Fix left-link of the page to the right of the new right sibling */
                                438            1383 :     if (spagenumber != P_NONE)
                                439                 :     {
                                440                 :         Buffer      sbuf;
                                441                 : 
                                442             534 :         if (XLogReadBufferForRedo(record, 2, &sbuf) == BLK_NEEDS_REDO)
                                443                 :         {
                                444             476 :             Page        spage = (Page) BufferGetPage(sbuf);
  373 michael                   445             476 :             BTPageOpaque spageop = BTPageGetOpaque(spage);
                                446                 : 
  975 pg                        447             476 :             spageop->btpo_prev = rightpagenumber;
                                448                 : 
                                449             476 :             PageSetLSN(spage, lsn);
                                450             476 :             MarkBufferDirty(sbuf);
                                451                 :         }
                                452             534 :         if (BufferIsValid(sbuf))
                                453             534 :             UnlockReleaseBuffer(sbuf);
                                454                 :     }
                                455                 : 
                                456                 :     /*
                                457                 :      * Finally, release the remaining buffers.  sbuf, rbuf, and buf must be
                                458                 :      * released together, so that readers cannot observe inconsistencies.
                                459                 :      */
                                460            1383 :     UnlockReleaseBuffer(rbuf);
                                461            1383 :     if (BufferIsValid(buf))
                                462            1383 :         UnlockReleaseBuffer(buf);
 7352 tgl                       463            1383 : }
                                464                 : 
                                465                 : static void
 1138 pg                        466            2010 : btree_xlog_dedup(XLogReaderState *record)
                                467                 : {
                                468            2010 :     XLogRecPtr  lsn = record->EndRecPtr;
                                469            2010 :     xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record);
                                470                 :     Buffer      buf;
                                471                 : 
                                472            2010 :     if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
                                473                 :     {
                                474            2006 :         char       *ptr = XLogRecGetBlockData(record, 0, NULL);
                                475            2006 :         Page        page = (Page) BufferGetPage(buf);
  373 michael                   476            2006 :         BTPageOpaque opaque = BTPageGetOpaque(page);
                                477                 :         OffsetNumber offnum,
                                478                 :                     minoff,
                                479                 :                     maxoff;
                                480                 :         BTDedupState state;
                                481                 :         BTDedupInterval *intervals;
                                482                 :         Page        newpage;
                                483                 : 
 1138 pg                        484            2006 :         state = (BTDedupState) palloc(sizeof(BTDedupStateData));
                                485            2006 :         state->deduplicate = true;   /* unused */
 1024                           486            2006 :         state->nmaxitems = 0;    /* unused */
                                487                 :         /* Conservatively use larger maxpostingsize than primary */
 1138                           488            2006 :         state->maxpostingsize = BTMaxItemSize(page);
                                489            2006 :         state->base = NULL;
                                490            2006 :         state->baseoff = InvalidOffsetNumber;
                                491            2006 :         state->basetupsize = 0;
                                492            2006 :         state->htids = palloc(state->maxpostingsize);
                                493            2006 :         state->nhtids = 0;
                                494            2006 :         state->nitems = 0;
                                495            2006 :         state->phystupsize = 0;
                                496            2006 :         state->nintervals = 0;
                                497                 : 
                                498            2006 :         minoff = P_FIRSTDATAKEY(opaque);
                                499            2006 :         maxoff = PageGetMaxOffsetNumber(page);
                                500            2006 :         newpage = PageGetTempPageCopySpecial(page);
                                501                 : 
                                502            2006 :         if (!P_RIGHTMOST(opaque))
                                503                 :         {
                                504            1726 :             ItemId      itemid = PageGetItemId(page, P_HIKEY);
                                505            1726 :             Size        itemsz = ItemIdGetLength(itemid);
                                506            1726 :             IndexTuple  item = (IndexTuple) PageGetItem(page, itemid);
                                507                 : 
                                508            1726 :             if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY,
                                509                 :                             false, false) == InvalidOffsetNumber)
 1138 pg                        510 UBC           0 :                 elog(ERROR, "deduplication failed to add highkey");
                                511                 :         }
                                512                 : 
 1138 pg                        513 CBC        2006 :         intervals = (BTDedupInterval *) ptr;
                                514            2006 :         for (offnum = minoff;
                                515          462869 :              offnum <= maxoff;
                                516          460863 :              offnum = OffsetNumberNext(offnum))
                                517                 :         {
                                518          460863 :             ItemId      itemid = PageGetItemId(page, offnum);
                                519          460863 :             IndexTuple  itup = (IndexTuple) PageGetItem(page, itemid);
                                520                 : 
                                521          460863 :             if (offnum == minoff)
                                522            2006 :                 _bt_dedup_start_pending(state, itup, offnum);
                                523          458857 :             else if (state->nintervals < xlrec->nintervals &&
                                524          344070 :                      state->baseoff == intervals[state->nintervals].baseoff &&
                                525          123674 :                      state->nitems < intervals[state->nintervals].nitems)
                                526                 :             {
                                527           80914 :                 if (!_bt_dedup_save_htid(state, itup))
 1138 pg                        528 UBC           0 :                     elog(ERROR, "deduplication failed to add heap tid to pending posting list");
                                529                 :             }
                                530                 :             else
                                531                 :             {
 1138 pg                        532 CBC      377943 :                 _bt_dedup_finish_pending(newpage, state);
                                533          377943 :                 _bt_dedup_start_pending(state, itup, offnum);
                                534                 :             }
                                535                 :         }
                                536                 : 
                                537            2006 :         _bt_dedup_finish_pending(newpage, state);
                                538            2006 :         Assert(state->nintervals == xlrec->nintervals);
                                539            2006 :         Assert(memcmp(state->intervals, intervals,
                                540                 :                       state->nintervals * sizeof(BTDedupInterval)) == 0);
                                541                 : 
                                542            2006 :         if (P_HAS_GARBAGE(opaque))
                                543                 :         {
  373 michael                   544 UBC           0 :             BTPageOpaque nopaque = BTPageGetOpaque(newpage);
                                545                 : 
 1138 pg                        546               0 :             nopaque->btpo_flags &= ~BTP_HAS_GARBAGE;
                                547                 :         }
                                548                 : 
 1138 pg                        549 CBC        2006 :         PageRestoreTempPage(newpage, page);
                                550            2006 :         PageSetLSN(page, lsn);
                                551            2006 :         MarkBufferDirty(buf);
                                552                 :     }
                                553                 : 
                                554            2010 :     if (BufferIsValid(buf))
                                555            2010 :         UnlockReleaseBuffer(buf);
                                556            2010 : }
                                557                 : 
                                558                 : static void
  816                           559             124 : btree_xlog_updates(Page page, OffsetNumber *updatedoffsets,
                                560                 :                    xl_btree_update *updates, int nupdated)
                                561                 : {
                                562                 :     BTVacuumPosting vacposting;
                                563                 :     IndexTuple  origtuple;
                                564                 :     ItemId      itemid;
                                565                 :     Size        itemsz;
                                566                 : 
                                567            6673 :     for (int i = 0; i < nupdated; i++)
                                568                 :     {
                                569            6549 :         itemid = PageGetItemId(page, updatedoffsets[i]);
                                570            6549 :         origtuple = (IndexTuple) PageGetItem(page, itemid);
                                571                 : 
                                572            6549 :         vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) +
                                573            6549 :                             updates->ndeletedtids * sizeof(uint16));
                                574            6549 :         vacposting->updatedoffset = updatedoffsets[i];
                                575            6549 :         vacposting->itup = origtuple;
                                576            6549 :         vacposting->ndeletedtids = updates->ndeletedtids;
                                577            6549 :         memcpy(vacposting->deletetids,
                                578                 :                (char *) updates + SizeOfBtreeUpdate,
                                579            6549 :                updates->ndeletedtids * sizeof(uint16));
                                580                 : 
                                581            6549 :         _bt_update_posting(vacposting);
                                582                 : 
                                583                 :         /* Overwrite updated version of tuple */
                                584            6549 :         itemsz = MAXALIGN(IndexTupleSize(vacposting->itup));
                                585            6549 :         if (!PageIndexTupleOverwrite(page, updatedoffsets[i],
                                586            6549 :                                      (Item) vacposting->itup, itemsz))
  816 pg                        587 UBC           0 :             elog(PANIC, "failed to update partially dead item");
                                588                 : 
  816 pg                        589 CBC        6549 :         pfree(vacposting->itup);
                                590            6549 :         pfree(vacposting);
                                591                 : 
                                592                 :         /* advance to next xl_btree_update from array */
                                593            6549 :         updates = (xl_btree_update *)
                                594            6549 :             ((char *) updates + SizeOfBtreeUpdate +
                                595            6549 :              updates->ndeletedtids * sizeof(uint16));
                                596                 :     }
                                597             124 : }
                                598                 : 
                                599                 : static void
 3062 heikki.linnakangas        600             916 : btree_xlog_vacuum(XLogReaderState *record)
                                601                 : {
                                602             916 :     XLogRecPtr  lsn = record->EndRecPtr;
 1207 pg                        603             916 :     xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
                                604                 :     Buffer      buffer;
                                605                 :     Page        page;
                                606                 :     BTPageOpaque opaque;
                                607                 : 
                                608                 :     /*
                                609                 :      * We need to take a cleanup lock here, just like btvacuumpage(). However,
                                610                 :      * it isn't necessary to exhaustively get a cleanup lock on every block in
                                611                 :      * the index during recovery (just getting a cleanup lock on pages with
                                612                 :      * items to kill suffices).  See nbtree/README for details.
                                613                 :      */
 3062 heikki.linnakangas        614             916 :     if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
                                615                 :         == BLK_NEEDS_REDO)
                                616                 :     {
 1207 pg                        617             791 :         char       *ptr = XLogRecGetBlockData(record, 0, NULL);
                                618                 : 
 2545 kgrittn                   619             791 :         page = (Page) BufferGetPage(buffer);
                                620                 : 
 1138 pg                        621             791 :         if (xlrec->nupdated > 0)
                                622                 :         {
                                623                 :             OffsetNumber *updatedoffsets;
                                624                 :             xl_btree_update *updates;
                                625                 : 
                                626              25 :             updatedoffsets = (OffsetNumber *)
                                627              25 :                 (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
                                628              25 :             updates = (xl_btree_update *) ((char *) updatedoffsets +
                                629              25 :                                            xlrec->nupdated *
                                630                 :                                            sizeof(OffsetNumber));
                                631                 : 
  816                           632              25 :             btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
                                633                 :         }
                                634                 : 
 1138                           635             791 :         if (xlrec->ndeleted > 0)
                                636             791 :             PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
                                637                 : 
                                638                 :         /*
                                639                 :          * Mark the page as not containing any LP_DEAD items --- see comments
                                640                 :          * in _bt_delitems_vacuum().
                                641                 :          */
  373 michael                   642             791 :         opaque = BTPageGetOpaque(page);
 3161 heikki.linnakangas        643             791 :         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
                                644                 : 
                                645             791 :         PageSetLSN(page, lsn);
                                646             791 :         MarkBufferDirty(buffer);
                                647                 :     }
                                648             916 :     if (BufferIsValid(buffer))
                                649             916 :         UnlockReleaseBuffer(buffer);
 4859 simon                     650             916 : }
                                651                 : 
                                652                 : static void
 3062 heikki.linnakangas        653             645 : btree_xlog_delete(XLogReaderState *record)
                                654                 : {
                                655             645 :     XLogRecPtr  lsn = record->EndRecPtr;
 3800 tgl                       656             645 :     xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
                                657                 :     Buffer      buffer;
                                658                 :     Page        page;
                                659                 :     BTPageOpaque opaque;
                                660                 : 
                                661                 :     /*
                                662                 :      * If we have any conflict processing to do, it must happen before we
                                663                 :      * update the page
                                664                 :      */
                                665             645 :     if (InHotStandby)
                                666                 :     {
                                667                 :         RelFileLocator rlocator;
                                668                 : 
  277 rhaas                     669 GNC         643 :         XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                                670                 : 
  143 pg                        671             643 :         ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
    2 andres                    672             643 :                                             xlrec->isCatalogRel,
                                673                 :                                             rlocator);
 3800 tgl                       674 ECB             :     }
                                675                 : 
                                676                 :     /*
                                677                 :      * We don't need to take a cleanup lock to apply these changes. See
                                678                 :      * nbtree/README for details.
                                679                 :      */
 3062 heikki.linnakangas        680 GIC         645 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
                                681                 :     {
 1192 pg                        682 CBC         645 :         char       *ptr = XLogRecGetBlockData(record, 0, NULL);
                                683                 : 
                                684             645 :         page = (Page) BufferGetPage(buffer);
                                685                 : 
  816                           686             645 :         if (xlrec->nupdated > 0)
                                687                 :         {
  816 pg                        688 ECB             :             OffsetNumber *updatedoffsets;
                                689                 :             xl_btree_update *updates;
                                690                 : 
  816 pg                        691 GIC          99 :             updatedoffsets = (OffsetNumber *)
                                692              99 :                 (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
  816 pg                        693 CBC          99 :             updates = (xl_btree_update *) ((char *) updatedoffsets +
                                694              99 :                                            xlrec->nupdated *
  816 pg                        695 ECB             :                                            sizeof(OffsetNumber));
                                696                 : 
  816 pg                        697 GIC          99 :             btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
                                698                 :         }
  816 pg                        699 ECB             : 
  816 pg                        700 GIC         645 :         if (xlrec->ndeleted > 0)
                                701             616 :             PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
 7352 tgl                       702 ECB             : 
 1194 pg                        703                 :         /* Mark the page as not containing any LP_DEAD items */
  373 michael                   704 GIC         645 :         opaque = BTPageGetOpaque(page);
 3161 heikki.linnakangas        705             645 :         opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
 6102 tgl                       706 ECB             : 
 3161 heikki.linnakangas        707 CBC         645 :         PageSetLSN(page, lsn);
 3161 heikki.linnakangas        708 GIC         645 :         MarkBufferDirty(buffer);
 3161 heikki.linnakangas        709 ECB             :     }
 3161 heikki.linnakangas        710 CBC         645 :     if (BufferIsValid(buffer))
 3161 heikki.linnakangas        711 GIC         645 :         UnlockReleaseBuffer(buffer);
 7352 tgl                       712 CBC         645 : }
 7352 tgl                       713 ECB             : 
 7350                           714                 : static void
 3062 heikki.linnakangas        715 GIC         618 : btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
                                716                 : {
 3062 heikki.linnakangas        717 CBC         618 :     XLogRecPtr  lsn = record->EndRecPtr;
 3313 heikki.linnakangas        718 GIC         618 :     xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
 7350 tgl                       719 ECB             :     Buffer      buffer;
                                720                 :     Page        page;
                                721                 :     BTPageOpaque pageop;
                                722                 :     IndexTupleData trunctuple;
                                723                 : 
                                724                 :     /*
                                725                 :      * In normal operation, we would lock all the pages this WAL record
                                726                 :      * touches before changing any of them.  In WAL replay, it should be okay
                                727                 :      * to lock just one page at a time, since no concurrent index updates can
                                728                 :      * be happening, and readers should not care whether they arrive at the
                                729                 :      * target page or not (since it's surely empty).
                                730                 :      */
                                731                 : 
                                732                 :     /* to-be-deleted subtree's parent page */
 3062 heikki.linnakangas        733 GIC         618 :     if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
                                734                 :     {
 3161 heikki.linnakangas        735 ECB             :         OffsetNumber poffset;
                                736                 :         ItemId      itemid;
                                737                 :         IndexTuple  itup;
                                738                 :         OffsetNumber nextoffset;
                                739                 :         BlockNumber rightsib;
                                740                 : 
 2545 kgrittn                   741 GIC         617 :         page = (Page) BufferGetPage(buffer);
  373 michael                   742             617 :         pageop = BTPageGetOpaque(page);
 1207 bruce                     743 ECB             : 
 3062 heikki.linnakangas        744 CBC         617 :         poffset = xlrec->poffset;
                                745                 : 
 3161                           746             617 :         nextoffset = OffsetNumberNext(poffset);
 3161 heikki.linnakangas        747 GIC         617 :         itemid = PageGetItemId(page, nextoffset);
 3161 heikki.linnakangas        748 CBC         617 :         itup = (IndexTuple) PageGetItem(page, itemid);
 1210 pg                        749             617 :         rightsib = BTreeTupleGetDownLink(itup);
 3161 heikki.linnakangas        750 ECB             : 
 3161 heikki.linnakangas        751 CBC         617 :         itemid = PageGetItemId(page, poffset);
 3161 heikki.linnakangas        752 GIC         617 :         itup = (IndexTuple) PageGetItem(page, itemid);
 1210 pg                        753 CBC         617 :         BTreeTupleSetDownLink(itup, rightsib);
 3161 heikki.linnakangas        754             617 :         nextoffset = OffsetNumberNext(poffset);
                                755             617 :         PageIndexTupleDelete(page, nextoffset);
 3161 heikki.linnakangas        756 ECB             : 
 3161 heikki.linnakangas        757 CBC         617 :         PageSetLSN(page, lsn);
 3161 heikki.linnakangas        758 GIC         617 :         MarkBufferDirty(buffer);
 7350 tgl                       759 ECB             :     }
  975 pg                        760                 : 
                                761                 :     /*
                                762                 :      * Don't need to couple cross-level locks in REDO routines, so release
                                763                 :      * lock on internal page immediately
                                764                 :      */
 3161 heikki.linnakangas        765 GIC         618 :     if (BufferIsValid(buffer))
                                766             618 :         UnlockReleaseBuffer(buffer);
 7350 tgl                       767 ECB             : 
 3313 heikki.linnakangas        768                 :     /* Rewrite the leaf page as a halfdead page */
 3062 heikki.linnakangas        769 GIC         618 :     buffer = XLogInitBufferForRedo(record, 0);
 2545 kgrittn                   770             618 :     page = (Page) BufferGetPage(buffer);
 3313 heikki.linnakangas        771 ECB             : 
 3313 heikki.linnakangas        772 CBC         618 :     _bt_pageinit(page, BufferGetPageSize(buffer));
  373 michael                   773 GIC         618 :     pageop = BTPageGetOpaque(page);
 3313 heikki.linnakangas        774 ECB             : 
 3313 heikki.linnakangas        775 CBC         618 :     pageop->btpo_prev = xlrec->leftblk;
 3313 heikki.linnakangas        776 GIC         618 :     pageop->btpo_next = xlrec->rightblk;
  774 pg                        777 CBC         618 :     pageop->btpo_level = 0;
 3313 heikki.linnakangas        778             618 :     pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
                                779             618 :     pageop->btpo_cycleid = 0;
 3313 heikki.linnakangas        780 ECB             : 
                                781                 :     /*
                                782                 :      * Construct a dummy high key item that points to top parent page (value
                                783                 :      * is InvalidBlockNumber when the top parent page is the leaf page itself)
                                784                 :      */
 3313 heikki.linnakangas        785 GIC         618 :     MemSet(&trunctuple, 0, sizeof(IndexTupleData));
                                786             618 :     trunctuple.t_info = sizeof(IndexTupleData);
 1812 teodor                    787 CBC         618 :     BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
 1816 teodor                    788 ECB             : 
 3313 heikki.linnakangas        789 CBC         618 :     if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
                                790                 :                     false, false) == InvalidOffsetNumber)
 3313 heikki.linnakangas        791 LBC           0 :         elog(ERROR, "could not add dummy high key to half-dead page");
                                792                 : 
 3313 heikki.linnakangas        793 GBC         618 :     PageSetLSN(page, lsn);
 3313 heikki.linnakangas        794 GIC         618 :     MarkBufferDirty(buffer);
 3313 heikki.linnakangas        795 CBC         618 :     UnlockReleaseBuffer(buffer);
                                796             618 : }
 3313 heikki.linnakangas        797 ECB             : 
                                798                 : 
                                799                 : static void
 3062 heikki.linnakangas        800 GIC         665 : btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
                                801                 : {
 3062 heikki.linnakangas        802 CBC         665 :     XLogRecPtr  lsn = record->EndRecPtr;
 3313 heikki.linnakangas        803 GIC         665 :     xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
 3313 heikki.linnakangas        804 ECB             :     BlockNumber leftsib;
                                805                 :     BlockNumber rightsib;
                                806                 :     uint32      level;
                                807                 :     bool        isleaf;
                                808                 :     FullTransactionId safexid;
                                809                 :     Buffer      leftbuf;
                                810                 :     Buffer      target;
                                811                 :     Buffer      rightbuf;
                                812                 :     Page        page;
                                813                 :     BTPageOpaque pageop;
                                814                 : 
 3313 heikki.linnakangas        815 GIC         665 :     leftsib = xlrec->leftsib;
                                816             665 :     rightsib = xlrec->rightsib;
  774 pg                        817 CBC         665 :     level = xlrec->level;
                                818             665 :     isleaf = (level == 0);
                                819             665 :     safexid = xlrec->safexid;
  774 pg                        820 ECB             : 
                                821                 :     /* No leaftopparent for level 0 (leaf page) or level 1 target */
  768 pg                        822 GIC         665 :     Assert(!BlockNumberIsValid(xlrec->leaftopparent) || level > 1);
                                823                 : 
 3313 heikki.linnakangas        824 ECB             :     /*
                                825                 :      * In normal operation, we would lock all the pages this WAL record
                                826                 :      * touches before changing any of them.  In WAL replay, we at least lock
                                827                 :      * the pages in the same standard left-to-right order (leftsib, target,
                                828                 :      * rightsib), and don't release the sibling locks until the target is
                                829                 :      * marked deleted.
                                830                 :      */
                                831                 : 
                                832                 :     /* Fix right-link of left sibling, if any */
 3161 heikki.linnakangas        833 GIC         665 :     if (leftsib != P_NONE)
                                834                 :     {
  979 pg                        835 CBC          53 :         if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
                                836                 :         {
                                837              53 :             page = (Page) BufferGetPage(leftbuf);
  373 michael                   838 GIC          53 :             pageop = BTPageGetOpaque(page);
 3161 heikki.linnakangas        839 CBC          53 :             pageop->btpo_next = rightsib;
 3161 heikki.linnakangas        840 ECB             : 
 3161 heikki.linnakangas        841 CBC          53 :             PageSetLSN(page, lsn);
  979 pg                        842 GIC          53 :             MarkBufferDirty(leftbuf);
 6220 tgl                       843 ECB             :         }
                                844                 :     }
                                845                 :     else
  979 pg                        846 GIC         612 :         leftbuf = InvalidBuffer;
                                847                 : 
 7350 tgl                       848 ECB             :     /* Rewrite target page as empty deleted page */
  979 pg                        849 GIC         665 :     target = XLogInitBufferForRedo(record, 0);
                                850             665 :     page = (Page) BufferGetPage(target);
 6220 tgl                       851 ECB             : 
  979 pg                        852 CBC         665 :     _bt_pageinit(page, BufferGetPageSize(target));
  373 michael                   853 GIC         665 :     pageop = BTPageGetOpaque(page);
 7350 tgl                       854 ECB             : 
 6516 tgl                       855 CBC         665 :     pageop->btpo_prev = leftsib;
 6516 tgl                       856 GIC         665 :     pageop->btpo_next = rightsib;
  774 pg                        857 CBC         665 :     pageop->btpo_level = level;
                                858             665 :     BTPageSetDeleted(page, safexid);
                                859             665 :     if (isleaf)
  885                           860             618 :         pageop->btpo_flags |= BTP_LEAF;
 6180 tgl                       861             665 :     pageop->btpo_cycleid = 0;
 7350 tgl                       862 ECB             : 
 6516 tgl                       863 CBC         665 :     PageSetLSN(page, lsn);
  979 pg                        864 GIC         665 :     MarkBufferDirty(target);
  979 pg                        865 ECB             : 
                                866                 :     /* Fix left-link of right sibling */
  979 pg                        867 GIC         665 :     if (XLogReadBufferForRedo(record, 2, &rightbuf) == BLK_NEEDS_REDO)
                                868                 :     {
  979 pg                        869 CBC         646 :         page = (Page) BufferGetPage(rightbuf);
  373 michael                   870 GIC         646 :         pageop = BTPageGetOpaque(page);
  979 pg                        871 CBC         646 :         pageop->btpo_prev = leftsib;
  979 pg                        872 ECB             : 
  979 pg                        873 CBC         646 :         PageSetLSN(page, lsn);
  979 pg                        874 GIC         646 :         MarkBufferDirty(rightbuf);
  979 pg                        875 ECB             :     }
                                876                 : 
                                877                 :     /* Release siblings */
  979 pg                        878 GIC         665 :     if (BufferIsValid(leftbuf))
                                879              53 :         UnlockReleaseBuffer(leftbuf);
  979 pg                        880 CBC         665 :     if (BufferIsValid(rightbuf))
                                881             665 :         UnlockReleaseBuffer(rightbuf);
  979 pg                        882 ECB             : 
                                883                 :     /* Release target */
  979 pg                        884 GIC         665 :     UnlockReleaseBuffer(target);
                                885                 : 
 3313 heikki.linnakangas        886 ECB             :     /*
                                887                 :      * If we deleted a parent of the targeted leaf page, instead of the leaf
                                888                 :      * itself, update the leaf to point to the next remaining child in the
                                889                 :      * to-be-deleted subtree
                                890                 :      */
 3062 heikki.linnakangas        891 GIC         665 :     if (XLogRecHasBlockRef(record, 3))
                                892                 :     {
 3313 heikki.linnakangas        893 ECB             :         /*
                                894                 :          * There is no real data on the page, so we just re-create it from
                                895                 :          * scratch using the information from the WAL record.
                                896                 :          *
                                897                 :          * Note that we don't end up here when the target page is also the
                                898                 :          * leafbuf page.  There is no need to add a dummy hikey item with a
                                899                 :          * top parent link when deleting leafbuf because it's the last page
                                900                 :          * we'll delete in the subtree undergoing deletion.
                                901                 :          */
                                902                 :         Buffer      leafbuf;
                                903                 :         IndexTupleData trunctuple;
                                904                 : 
  774 pg                        905 GIC          47 :         Assert(!isleaf);
                                906                 : 
  979 pg                        907 CBC          47 :         leafbuf = XLogInitBufferForRedo(record, 3);
  979 pg                        908 GIC          47 :         page = (Page) BufferGetPage(leafbuf);
 3313 heikki.linnakangas        909 ECB             : 
  979 pg                        910 CBC          47 :         _bt_pageinit(page, BufferGetPageSize(leafbuf));
  373 michael                   911 GIC          47 :         pageop = BTPageGetOpaque(page);
 2840 heikki.linnakangas        912 ECB             : 
 3313 heikki.linnakangas        913 CBC          47 :         pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
 3313 heikki.linnakangas        914 GIC          47 :         pageop->btpo_prev = xlrec->leafleftsib;
 3313 heikki.linnakangas        915 CBC          47 :         pageop->btpo_next = xlrec->leafrightsib;
  774 pg                        916              47 :         pageop->btpo_level = 0;
 3313 heikki.linnakangas        917              47 :         pageop->btpo_cycleid = 0;
 3313 heikki.linnakangas        918 ECB             : 
                                919                 :         /* Add a dummy hikey item */
 3313 heikki.linnakangas        920 GIC          94 :         MemSet(&trunctuple, 0, sizeof(IndexTupleData));
                                921              47 :         trunctuple.t_info = sizeof(IndexTupleData);
  774 pg                        922 CBC          47 :         BTreeTupleSetTopParent(&trunctuple, xlrec->leaftopparent);
 1816 teodor                    923 ECB             : 
 3313 heikki.linnakangas        924 CBC          47 :         if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
                                925                 :                         false, false) == InvalidOffsetNumber)
 3313 heikki.linnakangas        926 LBC           0 :             elog(ERROR, "could not add dummy high key to half-dead page");
                                927                 : 
 3313 heikki.linnakangas        928 GBC          47 :         PageSetLSN(page, lsn);
  979 pg                        929 GIC          47 :         MarkBufferDirty(leafbuf);
  979 pg                        930 CBC          47 :         UnlockReleaseBuffer(leafbuf);
 3313 heikki.linnakangas        931 ECB             :     }
                                932                 : 
                                933                 :     /* Update metapage if needed */
 3313 heikki.linnakangas        934 GIC         665 :     if (info == XLOG_BTREE_UNLINK_PAGE_META)
 3062                           935               9 :         _bt_restore_meta(record, 4);
 7350 tgl                       936 CBC         665 : }
 7350 tgl                       937 ECB             : 
 7352                           938                 : static void
 3062 heikki.linnakangas        939 GIC         573 : btree_xlog_newroot(XLogReaderState *record)
                                940                 : {
 3062 heikki.linnakangas        941 CBC         573 :     XLogRecPtr  lsn = record->EndRecPtr;
 7352 tgl                       942 GIC         573 :     xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
 7352 tgl                       943 ECB             :     Buffer      buffer;
                                944                 :     Page        page;
                                945                 :     BTPageOpaque pageop;
                                946                 :     char       *ptr;
                                947                 :     Size        len;
                                948                 : 
 3062 heikki.linnakangas        949 GIC         573 :     buffer = XLogInitBufferForRedo(record, 0);
 2545 kgrittn                   950             573 :     page = (Page) BufferGetPage(buffer);
 6220 tgl                       951 ECB             : 
 7352 tgl                       952 CBC         573 :     _bt_pageinit(page, BufferGetPageSize(buffer));
  373 michael                   953 GIC         573 :     pageop = BTPageGetOpaque(page);
 7352 tgl                       954 ECB             : 
 7352 tgl                       955 CBC         573 :     pageop->btpo_flags = BTP_ROOT;
 7352 tgl                       956 GIC         573 :     pageop->btpo_prev = pageop->btpo_next = P_NONE;
  774 pg                        957 CBC         573 :     pageop->btpo_level = xlrec->level;
 7352 tgl                       958             573 :     if (xlrec->level == 0)
                                959             521 :         pageop->btpo_flags |= BTP_LEAF;
 6180                           960             573 :     pageop->btpo_cycleid = 0;
 7352 tgl                       961 ECB             : 
 3062 heikki.linnakangas        962 CBC         573 :     if (xlrec->level > 0)
                                963                 :     {
                                964              52 :         ptr = XLogRecGetBlockData(record, 0, &len);
 3062 heikki.linnakangas        965 GIC          52 :         _bt_restore_page(page, ptr, len);
 3309 heikki.linnakangas        966 ECB             : 
                                967                 :         /* Clear the incomplete-split flag in left child */
 3062 heikki.linnakangas        968 GIC          52 :         _bt_clear_incomplete_split(record, 1);
                                969                 :     }
 7352 tgl                       970 ECB             : 
 7352 tgl                       971 GIC         573 :     PageSetLSN(page, lsn);
 6218                           972             573 :     MarkBufferDirty(buffer);
 6218 tgl                       973 CBC         573 :     UnlockReleaseBuffer(buffer);
 7352 tgl                       974 ECB             : 
 3062 heikki.linnakangas        975 CBC         573 :     _bt_restore_meta(record, 2);
 7352 tgl                       976 GIC         573 : }
 7352 tgl                       977 ECB             : 
  774 pg                        978                 : /*
                                979                 :  * In general VACUUM must defer recycling as a way of avoiding certain race
                                980                 :  * conditions.  Deleted pages contain a safexid value that is used by VACUUM
                                981                 :  * to determine whether or not it's safe to place a page that was deleted by
                                982                 :  * VACUUM earlier into the FSM now.  See nbtree/README.
                                983                 :  *
                                984                 :  * As far as any backend operating during original execution is concerned, the
                                985                 :  * FSM is a cache of recycle-safe pages; the mere presence of the page in the
                                986                 :  * FSM indicates that the page must already be safe to recycle (actually,
                                987                 :  * _bt_getbuf() verifies it's safe using BTPageIsRecyclable(), but that's just
                                988                 :  * because it would be unwise to completely trust the FSM, given its current
                                989                 :  * limitations).
                                990                 :  *
                                991                 :  * This isn't sufficient to prevent similar concurrent recycling race
                                992                 :  * conditions during Hot Standby, though.  For that we need to log a
                                993                 :  * xl_btree_reuse_page record at the point that a page is actually recycled
                                994                 :  * and reused for an entirely unrelated page inside _bt_split().  These
                                995                 :  * records include the same safexid value from the original deleted page,
                                996                 :  * stored in the record's snapshotConflictHorizon field.
                                997                 :  *
                                998                 :  * The GlobalVisCheckRemovableFullXid() test in BTPageIsRecyclable() is used
                                999                 :  * to determine if it's safe to recycle a page.  This mirrors our own test:
                               1000                 :  * the PGPROC->xmin > limitXmin test inside GetConflictingVirtualXIDs().
                               1001                 :  * Consequently, one XID value achieves the same exclusion effect on primary
                               1002                 :  * and standby.
                               1003                 :  */
                               1004                 : static void
 3062 heikki.linnakangas       1005 UIC           0 : btree_xlog_reuse_page(XLogReaderState *record)
                               1006                 : {
 3800 tgl                      1007 UBC           0 :     xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
                               1008                 : 
 4803 simon                    1009               0 :     if (InHotStandby)
  143 pg                       1010 UNC           0 :         ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
    2 andres                   1011               0 :                                                    xlrec->isCatalogRel,
                               1012                 :                                                    xlrec->locator);
 3800 tgl                      1013 UBC           0 : }
 4803 simon                    1014 EUB             : 
                               1015                 : void
 3062 heikki.linnakangas       1016 GBC      473737 : btree_redo(XLogReaderState *record)
                               1017                 : {
 3062 heikki.linnakangas       1018 GIC      473737 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 1138 pg                       1019 ECB             :     MemoryContext oldCtx;
                               1020                 : 
 1138 pg                       1021 CBC      473737 :     oldCtx = MemoryContextSwitchTo(opCtx);
 7352 tgl                      1022 GIC      473737 :     switch (info)
                               1023                 :     {
 7352 tgl                      1024 CBC      463604 :         case XLOG_BTREE_INSERT_LEAF:
 1138 pg                       1025          463604 :             btree_xlog_insert(true, false, false, record);
 7352 tgl                      1026 GIC      463604 :             break;
 7352 tgl                      1027 CBC        1276 :         case XLOG_BTREE_INSERT_UPPER:
 1138 pg                       1028            1276 :             btree_xlog_insert(false, false, false, record);
 7352 tgl                      1029            1276 :             break;
                               1030               4 :         case XLOG_BTREE_INSERT_META:
 1138 pg                       1031               4 :             btree_xlog_insert(false, true, false, record);
 7352 tgl                      1032               4 :             break;
                               1033             170 :         case XLOG_BTREE_SPLIT_L:
 1481 pg                       1034             170 :             btree_xlog_split(true, record);
 7352 tgl                      1035             170 :             break;
                               1036            1213 :         case XLOG_BTREE_SPLIT_R:
 1481 pg                       1037            1213 :             btree_xlog_split(false, record);
 7352 tgl                      1038            1213 :             break;
 1138 pg                       1039            2031 :         case XLOG_BTREE_INSERT_POST:
                               1040            2031 :             btree_xlog_insert(true, false, true, record);
                               1041            2031 :             break;
                               1042            2010 :         case XLOG_BTREE_DEDUP:
                               1043            2010 :             btree_xlog_dedup(record);
                               1044            2010 :             break;
 4859 simon                    1045             916 :         case XLOG_BTREE_VACUUM:
 3062 heikki.linnakangas       1046             916 :             btree_xlog_vacuum(record);
 4859 simon                    1047             916 :             break;
 7352 tgl                      1048             645 :         case XLOG_BTREE_DELETE:
 3062 heikki.linnakangas       1049             645 :             btree_xlog_delete(record);
 7352 tgl                      1050             645 :             break;
 3313 heikki.linnakangas       1051             618 :         case XLOG_BTREE_MARK_PAGE_HALFDEAD:
 3062                          1052             618 :             btree_xlog_mark_page_halfdead(info, record);
 3313                          1053             618 :             break;
                               1054             665 :         case XLOG_BTREE_UNLINK_PAGE:
 3313 heikki.linnakangas       1055 ECB             :         case XLOG_BTREE_UNLINK_PAGE_META:
 3062 heikki.linnakangas       1056 CBC         665 :             btree_xlog_unlink_page(info, record);
 7352 tgl                      1057             665 :             break;
 7352 tgl                      1058 GIC         573 :         case XLOG_BTREE_NEWROOT:
 3062 heikki.linnakangas       1059 CBC         573 :             btree_xlog_newroot(record);
 7352 tgl                      1060             573 :             break;
 4727 heikki.linnakangas       1061 LBC           0 :         case XLOG_BTREE_REUSE_PAGE:
 3062                          1062               0 :             btree_xlog_reuse_page(record);
 4727                          1063               0 :             break;
 1831 teodor                   1064 GBC          12 :         case XLOG_BTREE_META_CLEANUP:
                               1065              12 :             _bt_restore_meta(record, 0);
                               1066              12 :             break;
 7352 tgl                      1067 LBC           0 :         default:
                               1068               0 :             elog(PANIC, "btree_redo: unknown op code %u", info);
 7352 tgl                      1069 ECB             :     }
 1138 pg                       1070 GBC      473737 :     MemoryContextSwitchTo(oldCtx);
                               1071          473737 :     MemoryContextReset(opCtx);
 1138 pg                       1072 GIC      473737 : }
 1138 pg                       1073 ECB             : 
                               1074                 : void
 1138 pg                       1075 CBC         141 : btree_xlog_startup(void)
                               1076                 : {
 1138 pg                       1077 GIC         141 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
 1138 pg                       1078 ECB             :                                   "Btree recovery temporary context",
                               1079                 :                                   ALLOCSET_DEFAULT_SIZES);
 1138 pg                       1080 CBC         141 : }
                               1081                 : 
                               1082                 : void
                               1083             108 : btree_xlog_cleanup(void)
                               1084                 : {
 1138 pg                       1085 GIC         108 :     MemoryContextDelete(opCtx);
 1138 pg                       1086 CBC         108 :     opCtx = NULL;
 7352 tgl                      1087 GIC         108 : }
 2251 rhaas                    1088 ECB             : 
                               1089                 : /*
                               1090                 :  * Mask a btree page before performing consistency checks on it.
                               1091                 :  */
                               1092                 : void
 2251 rhaas                    1093 UIC           0 : btree_mask(char *pagedata, BlockNumber blkno)
                               1094                 : {
                               1095               0 :     Page        page = (Page) pagedata;
 2251 rhaas                    1096 EUB             :     BTPageOpaque maskopaq;
                               1097                 : 
 2025 rhaas                    1098 UBC           0 :     mask_page_lsn_and_checksum(page);
                               1099                 : 
 2251 rhaas                    1100 UIC           0 :     mask_page_hint_bits(page);
 2251 rhaas                    1101 UBC           0 :     mask_unused_space(page);
                               1102                 : 
  373 michael                  1103               0 :     maskopaq = BTPageGetOpaque(page);
 2251 rhaas                    1104 EUB             : 
  977 akorotkov                1105 UIC           0 :     if (P_ISLEAF(maskopaq))
 2251 rhaas                    1106 EUB             :     {
                               1107                 :         /*
                               1108                 :          * In btree leaf pages, it is possible to modify the LP_FLAGS without
                               1109                 :          * emitting any WAL record. Hence, mask the line pointer flags. See
                               1110                 :          * _bt_killitems(), _bt_check_unique() for details.
                               1111                 :          */
 2251 rhaas                    1112 UIC           0 :         mask_lp_flags(page);
                               1113                 :     }
                               1114                 : 
 2251 rhaas                    1115 EUB             :     /*
                               1116                 :      * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
                               1117                 :      * _bt_delete_or_dedup_one_page(), _bt_killitems(), and _bt_check_unique()
                               1118                 :      * for details.
                               1119                 :      */
 2251 rhaas                    1120 UIC           0 :     maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
                               1121                 : 
                               1122                 :     /*
 2251 rhaas                    1123 EUB             :      * During replay of a btree page split, we don't set the BTP_SPLIT_END
                               1124                 :      * flag of the right sibling and initialize the cycle_id to 0 for the same
                               1125                 :      * page. See btree_xlog_split() for details.
                               1126                 :      */
 2251 rhaas                    1127 UIC           0 :     maskopaq->btpo_flags &= ~BTP_SPLIT_END;
                               1128               0 :     maskopaq->btpo_cycleid = 0;
                               1129               0 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a