LCOV - differential code coverage report
Current view: top level - src/backend/access/spgist - spgdoinsert.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 91.3 % 865 790 21 20 34 4 391 395 37 373 1
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 15 15 7 1 7 7
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * spgdoinsert.c
       4                 :  *    implementation of insert algorithm
       5                 :  *
       6                 :  *
       7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       8                 :  * Portions Copyright (c) 1994, Regents of the University of California
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *          src/backend/access/spgist/spgdoinsert.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : 
      16                 : #include "postgres.h"
      17                 : 
      18                 : #include "access/genam.h"
      19                 : #include "access/spgist_private.h"
      20                 : #include "access/spgxlog.h"
      21                 : #include "access/xloginsert.h"
      22                 : #include "common/pg_prng.h"
      23                 : #include "miscadmin.h"
      24                 : #include "storage/bufmgr.h"
      25                 : #include "utils/rel.h"
      26                 : 
      27                 : 
      28                 : /*
      29                 :  * SPPageDesc tracks all info about a page we are inserting into.  In some
      30                 :  * situations it actually identifies a tuple, or even a specific node within
      31                 :  * an inner tuple.  But any of the fields can be invalid.  If the buffer
      32                 :  * field is valid, it implies we hold pin and exclusive lock on that buffer.
      33                 :  * page pointer should be valid exactly when buffer is.
      34                 :  */
      35                 : typedef struct SPPageDesc
      36                 : {
      37                 :     BlockNumber blkno;          /* block number, or InvalidBlockNumber */
      38                 :     Buffer      buffer;         /* page's buffer number, or InvalidBuffer */
      39                 :     Page        page;           /* pointer to page buffer, or NULL */
      40                 :     OffsetNumber offnum;        /* offset of tuple, or InvalidOffsetNumber */
      41                 :     int         node;           /* node number within inner tuple, or -1 */
      42                 : } SPPageDesc;
      43                 : 
      44                 : 
      45                 : /*
      46                 :  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
      47                 :  * is used to update the parent inner tuple's downlink after a move or
      48                 :  * split operation.
      49                 :  */
      50                 : void
      51 CBC        5963 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
      52                 :                   BlockNumber blkno, OffsetNumber offset)
      53                 : {
      54                 :     int         i;
      55                 :     SpGistNodeTuple node;
      56                 : 
      57           30652 :     SGITITERATE(tup, i, node)
      58                 :     {
      59           30652 :         if (i == nodeN)
      60                 :         {
      61            5963 :             ItemPointerSet(&node->t_tid, blkno, offset);
      62            5963 :             return;
      63                 :         }
      64                 :     }
      65                 : 
      66 UBC           0 :     elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
      67                 :          nodeN);
      68                 : }
      69                 : 
      70                 : /*
      71                 :  * Form a new inner tuple containing one more node than the given one, with
      72                 :  * the specified label datum, inserted at offset "offset" in the node array.
      73                 :  * The new tuple's prefix is the same as the old one's.
      74                 :  *
      75                 :  * Note that the new node initially has an invalid downlink.  We'll find a
      76                 :  * page to point it to later.
      77                 :  */
      78                 : static SpGistInnerTuple
      79 CBC         783 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
      80                 : {
      81                 :     SpGistNodeTuple node,
      82                 :                *nodes;
      83                 :     int         i;
      84                 : 
      85                 :     /* if offset is negative, insert at end */
      86             783 :     if (offset < 0)
      87 UBC           0 :         offset = tuple->nNodes;
      88 CBC         783 :     else if (offset > tuple->nNodes)
      89 UBC           0 :         elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
      90                 : 
      91 CBC         783 :     nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
      92            5036 :     SGITITERATE(tuple, i, node)
      93                 :     {
      94            4253 :         if (i < offset)
      95            3646 :             nodes[i] = node;
      96                 :         else
      97             607 :             nodes[i + 1] = node;
      98                 :     }
      99                 : 
     100             783 :     nodes[offset] = spgFormNodeTuple(state, label, false);
     101                 : 
     102             783 :     return spgFormInnerTuple(state,
     103             783 :                              (tuple->prefixSize > 0),
     104             368 :                              SGITDATUM(tuple, state),
     105             783 :                              tuple->nNodes + 1,
     106                 :                              nodes);
     107                 : }
     108                 : 
     109                 : /* qsort comparator for sorting OffsetNumbers */
     110                 : static int
     111         2832902 : cmpOffsetNumbers(const void *a, const void *b)
     112                 : {
     113         2832902 :     if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
     114 UBC           0 :         return 0;
     115 CBC     2832902 :     return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
     116                 : }
     117                 : 
     118                 : /*
     119                 :  * Delete multiple tuples from an index page, preserving tuple offset numbers.
     120                 :  *
     121                 :  * The first tuple in the given list is replaced with a dead tuple of type
     122                 :  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
     123                 :  * with dead tuples of type "reststate".  If either firststate or reststate
     124                 :  * is REDIRECT, blkno/offnum specify where to link to.
     125                 :  *
     126                 :  * NB: this is used during WAL replay, so beware of trying to make it too
     127                 :  * smart.  In particular, it shouldn't use "state" except for calling
     128                 :  * spgFormDeadTuple().  This is also used in a critical section, so no
     129                 :  * pallocs either!
     130                 :  */
     131                 : void
     132            4302 : spgPageIndexMultiDelete(SpGistState *state, Page page,
     133                 :                         OffsetNumber *itemnos, int nitems,
     134                 :                         int firststate, int reststate,
     135                 :                         BlockNumber blkno, OffsetNumber offnum)
     136                 : {
     137                 :     OffsetNumber firstItem;
     138                 :     OffsetNumber sortednos[MaxIndexTuplesPerPage];
     139            4302 :     SpGistDeadTuple tuple = NULL;
     140                 :     int         i;
     141                 : 
     142            4302 :     if (nitems == 0)
     143             104 :         return;                 /* nothing to do */
     144                 : 
     145                 :     /*
     146                 :      * For efficiency we want to use PageIndexMultiDelete, which requires the
     147                 :      * targets to be listed in sorted order, so we have to sort the itemnos
     148                 :      * array.  (This also greatly simplifies the math for reinserting the
     149                 :      * replacement tuples.)  However, we must not scribble on the caller's
     150                 :      * array, so we have to make a copy.
     151                 :      */
     152            4198 :     memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
     153            4198 :     if (nitems > 1)
     154            4109 :         qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
     155                 : 
     156            4198 :     PageIndexMultiDelete(page, sortednos, nitems);
     157                 : 
     158            4198 :     firstItem = itemnos[0];
     159                 : 
     160          409536 :     for (i = 0; i < nitems; i++)
     161                 :     {
     162          405338 :         OffsetNumber itemno = sortednos[i];
     163                 :         int         tupstate;
     164                 : 
     165          405338 :         tupstate = (itemno == firstItem) ? firststate : reststate;
     166          405338 :         if (tuple == NULL || tuple->tupstate != tupstate)
     167            6487 :             tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
     168                 : 
     169          405338 :         if (PageAddItem(page, (Item) tuple, tuple->size,
     170                 :                         itemno, false, false) != itemno)
     171 UBC           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     172                 :                  tuple->size);
     173                 : 
     174 CBC      405338 :         if (tupstate == SPGIST_REDIRECT)
     175            1179 :             SpGistPageGetOpaque(page)->nRedirection++;
     176          404159 :         else if (tupstate == SPGIST_PLACEHOLDER)
     177          404159 :             SpGistPageGetOpaque(page)->nPlaceholder++;
     178                 :     }
     179                 : }
     180                 : 
     181                 : /*
     182                 :  * Update the parent inner tuple's downlink, and mark the parent buffer
     183                 :  * dirty (this must be the last change to the parent page in the current
     184                 :  * WAL action).
     185                 :  */
     186                 : static void
     187            4923 : saveNodeLink(Relation index, SPPageDesc *parent,
     188                 :              BlockNumber blkno, OffsetNumber offnum)
     189                 : {
     190                 :     SpGistInnerTuple innerTuple;
     191                 : 
     192            4923 :     innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
     193            4923 :                                                 PageGetItemId(parent->page, parent->offnum));
     194                 : 
     195            4923 :     spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
     196                 : 
     197            4923 :     MarkBufferDirty(parent->buffer);
     198            4923 : }
     199                 : 
     200                 : /*
     201                 :  * Add a leaf tuple to a leaf page where there is known to be room for it
     202                 :  */
     203                 : static void
     204          398773 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
     205                 :              SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
     206                 : {
     207                 :     spgxlogAddLeaf xlrec;
     208                 : 
     209          398773 :     xlrec.newPage = isNew;
     210          398773 :     xlrec.storesNulls = isNulls;
     211                 : 
     212                 :     /* these will be filled below as needed */
     213          398773 :     xlrec.offnumLeaf = InvalidOffsetNumber;
     214          398773 :     xlrec.offnumHeadLeaf = InvalidOffsetNumber;
     215          398773 :     xlrec.offnumParent = InvalidOffsetNumber;
     216          398773 :     xlrec.nodeI = 0;
     217                 : 
     218          398773 :     START_CRIT_SECTION();
     219                 : 
     220          398773 :     if (current->offnum == InvalidOffsetNumber ||
     221          397760 :         SpGistBlockIsRoot(current->blkno))
     222                 :     {
     223                 :         /* Tuple is not part of a chain */
     224            9494 :         SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
     225           18988 :         current->offnum = SpGistPageAddNewItem(state, current->page,
     226            9494 :                                                (Item) leafTuple, leafTuple->size,
     227                 :                                                NULL, false);
     228                 : 
     229            9494 :         xlrec.offnumLeaf = current->offnum;
     230                 : 
     231                 :         /* Must update parent's downlink if any */
     232            9494 :         if (parent->buffer != InvalidBuffer)
     233                 :         {
     234            1013 :             xlrec.offnumParent = parent->offnum;
     235            1013 :             xlrec.nodeI = parent->node;
     236                 : 
     237            1013 :             saveNodeLink(index, parent, current->blkno, current->offnum);
     238                 :         }
     239                 :     }
     240                 :     else
     241                 :     {
     242                 :         /*
     243                 :          * Tuple must be inserted into existing chain.  We mustn't change the
     244                 :          * chain's head address, but we don't need to chase the entire chain
     245                 :          * to put the tuple at the end; we can insert it second.
     246                 :          *
     247                 :          * Also, it's possible that the "chain" consists only of a DEAD tuple,
     248                 :          * in which case we should replace the DEAD tuple in-place.
     249                 :          */
     250                 :         SpGistLeafTuple head;
     251                 :         OffsetNumber offnum;
     252                 : 
     253          389279 :         head = (SpGistLeafTuple) PageGetItem(current->page,
     254          389279 :                                              PageGetItemId(current->page, current->offnum));
     255          389279 :         if (head->tupstate == SPGIST_LIVE)
     256                 :         {
     257          389279 :             SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
     258          389279 :             offnum = SpGistPageAddNewItem(state, current->page,
     259          389279 :                                           (Item) leafTuple, leafTuple->size,
     260                 :                                           NULL, false);
     261                 : 
     262                 :             /*
     263                 :              * re-get head of list because it could have been moved on page,
     264                 :              * and set new second element
     265                 :              */
     266          389279 :             head = (SpGistLeafTuple) PageGetItem(current->page,
     267          389279 :                                                  PageGetItemId(current->page, current->offnum));
     268          389279 :             SGLT_SET_NEXTOFFSET(head, offnum);
     269                 : 
     270          389279 :             xlrec.offnumLeaf = offnum;
     271          389279 :             xlrec.offnumHeadLeaf = current->offnum;
     272                 :         }
     273 UBC           0 :         else if (head->tupstate == SPGIST_DEAD)
     274                 :         {
     275               0 :             SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
     276               0 :             PageIndexTupleDelete(current->page, current->offnum);
     277               0 :             if (PageAddItem(current->page,
     278                 :                             (Item) leafTuple, leafTuple->size,
     279               0 :                             current->offnum, false, false) != current->offnum)
     280               0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     281                 :                      leafTuple->size);
     282                 : 
     283                 :             /* WAL replay distinguishes this case by equal offnums */
     284               0 :             xlrec.offnumLeaf = current->offnum;
     285               0 :             xlrec.offnumHeadLeaf = current->offnum;
     286                 :         }
     287                 :         else
     288               0 :             elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
     289                 :     }
     290                 : 
     291 CBC      398773 :     MarkBufferDirty(current->buffer);
     292                 : 
     293          398773 :     if (RelationNeedsWAL(index) && !state->isBuild)
     294                 :     {
     295                 :         XLogRecPtr  recptr;
     296                 :         int         flags;
     297                 : 
     298          120190 :         XLogBeginInsert();
     299          120190 :         XLogRegisterData((char *) &xlrec, sizeof(xlrec));
     300          120190 :         XLogRegisterData((char *) leafTuple, leafTuple->size);
     301                 : 
     302          120190 :         flags = REGBUF_STANDARD;
     303          120190 :         if (xlrec.newPage)
     304               1 :             flags |= REGBUF_WILL_INIT;
     305          120190 :         XLogRegisterBuffer(0, current->buffer, flags);
     306          120190 :         if (xlrec.offnumParent != InvalidOffsetNumber)
     307             415 :             XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
     308                 : 
     309          120190 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
     310                 : 
     311          120190 :         PageSetLSN(current->page, recptr);
     312                 : 
     313                 :         /* update parent only if we actually changed it */
     314          120190 :         if (xlrec.offnumParent != InvalidOffsetNumber)
     315                 :         {
     316             415 :             PageSetLSN(parent->page, recptr);
     317                 :         }
     318                 :     }
     319                 : 
     320          398773 :     END_CRIT_SECTION();
     321          398773 : }
     322                 : 
     323                 : /*
     324                 :  * Count the number and total size of leaf tuples in the chain starting at
     325                 :  * current->offnum.  Return number into *nToSplit and total size as function
     326                 :  * result.
     327                 :  *
     328                 :  * Klugy special case when considering the root page (i.e., root is a leaf
     329                 :  * page, but we're about to split for the first time): return fake large
     330                 :  * values to force spgdoinsert() to take the doPickSplit rather than
     331                 :  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
     332                 :  */
     333                 : static int
     334            3948 : checkSplitConditions(Relation index, SpGistState *state,
     335                 :                      SPPageDesc *current, int *nToSplit)
     336                 : {
     337                 :     int         i,
     338            3948 :                 n = 0,
     339            3948 :                 totalSize = 0;
     340                 : 
     341            3948 :     if (SpGistBlockIsRoot(current->blkno))
     342                 :     {
     343                 :         /* return impossible values to force split */
     344              41 :         *nToSplit = BLCKSZ;
     345              41 :         return BLCKSZ;
     346                 :     }
     347                 : 
     348            3907 :     i = current->offnum;
     349          400149 :     while (i != InvalidOffsetNumber)
     350                 :     {
     351                 :         SpGistLeafTuple it;
     352                 : 
     353          396242 :         Assert(i >= FirstOffsetNumber &&
     354                 :                i <= PageGetMaxOffsetNumber(current->page));
     355          396242 :         it = (SpGistLeafTuple) PageGetItem(current->page,
     356                 :                                            PageGetItemId(current->page, i));
     357          396242 :         if (it->tupstate == SPGIST_LIVE)
     358                 :         {
     359          396242 :             n++;
     360          396242 :             totalSize += it->size + sizeof(ItemIdData);
     361                 :         }
     362 UBC           0 :         else if (it->tupstate == SPGIST_DEAD)
     363                 :         {
     364                 :             /* We could see a DEAD tuple as first/only chain item */
     365               0 :             Assert(i == current->offnum);
     366               0 :             Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
     367                 :             /* Don't count it in result, because it won't go to other page */
     368                 :         }
     369                 :         else
     370               0 :             elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     371                 : 
     372 CBC      396242 :         i = SGLT_GET_NEXTOFFSET(it);
     373                 :     }
     374                 : 
     375            3907 :     *nToSplit = n;
     376                 : 
     377            3907 :     return totalSize;
     378                 : }
     379                 : 
     380                 : /*
     381                 :  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
     382                 :  * but the chain has to be moved because there's not enough room to add
     383                 :  * newLeafTuple to its page.  We use this method when the chain contains
     384                 :  * very little data so a split would be inefficient.  We are sure we can
     385                 :  * fit the chain plus newLeafTuple on one other page.
     386                 :  */
     387                 : static void
     388            1108 : moveLeafs(Relation index, SpGistState *state,
     389                 :           SPPageDesc *current, SPPageDesc *parent,
     390                 :           SpGistLeafTuple newLeafTuple, bool isNulls)
     391                 : {
     392                 :     int         i,
     393                 :                 nDelete,
     394                 :                 nInsert,
     395                 :                 size;
     396                 :     Buffer      nbuf;
     397                 :     Page        npage;
     398            1108 :     OffsetNumber r = InvalidOffsetNumber,
     399            1108 :                 startOffset = InvalidOffsetNumber;
     400 GIC        1108 :     bool        replaceDead = false;
     401                 :     OffsetNumber *toDelete;
     402                 :     OffsetNumber *toInsert;
     403                 :     BlockNumber nblkno;
     404                 :     spgxlogMoveLeafs xlrec;
     405                 :     char       *leafdata,
     406                 :                *leafptr;
     407                 : 
     408 ECB             :     /* This doesn't work on root page */
     409 CBC        1108 :     Assert(parent->buffer != InvalidBuffer);
     410 GIC        1108 :     Assert(parent->buffer != current->buffer);
     411                 : 
     412 ECB             :     /* Locate the tuples to be moved, and count up the space needed */
     413 CBC        1108 :     i = PageGetMaxOffsetNumber(current->page);
     414            1108 :     toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
     415 GIC        1108 :     toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
     416 ECB             : 
     417 GIC        1108 :     size = newLeafTuple->size + sizeof(ItemIdData);
     418 ECB             : 
     419 CBC        1108 :     nDelete = 0;
     420            1108 :     i = current->offnum;
     421 GIC       45299 :     while (i != InvalidOffsetNumber)
     422                 :     {
     423                 :         SpGistLeafTuple it;
     424 ECB             : 
     425 GIC       44191 :         Assert(i >= FirstOffsetNumber &&
     426 ECB             :                i <= PageGetMaxOffsetNumber(current->page));
     427 GIC       44191 :         it = (SpGistLeafTuple) PageGetItem(current->page,
     428                 :                                            PageGetItemId(current->page, i));
     429 ECB             : 
     430 GIC       44191 :         if (it->tupstate == SPGIST_LIVE)
     431 ECB             :         {
     432 CBC       44191 :             toDelete[nDelete] = i;
     433           44191 :             size += it->size + sizeof(ItemIdData);
     434 GIC       44191 :             nDelete++;
     435 EUB             :         }
     436 UIC           0 :         else if (it->tupstate == SPGIST_DEAD)
     437                 :         {
     438 EUB             :             /* We could see a DEAD tuple as first/only chain item */
     439 UBC           0 :             Assert(i == current->offnum);
     440 UIC           0 :             Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
     441 EUB             :             /* We don't want to move it, so don't count it in size */
     442 UBC           0 :             toDelete[nDelete] = i;
     443               0 :             nDelete++;
     444 UIC           0 :             replaceDead = true;
     445                 :         }
     446 EUB             :         else
     447 UIC           0 :             elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     448 ECB             : 
     449 GIC       44191 :         i = SGLT_GET_NEXTOFFSET(it);
     450                 :     }
     451                 : 
     452 ECB             :     /* Find a leaf page that will hold them */
     453 GIC        1108 :     nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
     454 ECB             :                            size, &xlrec.newPage);
     455 CBC        1108 :     npage = BufferGetPage(nbuf);
     456            1108 :     nblkno = BufferGetBlockNumber(nbuf);
     457 GIC        1108 :     Assert(nblkno != current->blkno);
     458 ECB             : 
     459 GIC        1108 :     leafdata = leafptr = palloc(size);
     460 ECB             : 
     461 GIC        1108 :     START_CRIT_SECTION();
     462                 : 
     463 ECB             :     /* copy all the old tuples to new page, unless they're dead */
     464 CBC        1108 :     nInsert = 0;
     465 GIC        1108 :     if (!replaceDead)
     466 ECB             :     {
     467 GIC       45299 :         for (i = 0; i < nDelete; i++)
     468                 :         {
     469                 :             SpGistLeafTuple it;
     470                 : 
     471           44191 :             it = (SpGistLeafTuple) PageGetItem(current->page,
     472 CBC       44191 :                                                PageGetItemId(current->page, toDelete[i]));
     473           44191 :             Assert(it->tupstate == SPGIST_LIVE);
     474 ECB             : 
     475                 :             /*
     476                 :              * Update chain link (notice the chain order gets reversed, but we
     477                 :              * don't care).  We're modifying the tuple on the source page
     478                 :              * here, but it's okay since we're about to delete it.
     479                 :              */
     480 GIC       44191 :             SGLT_SET_NEXTOFFSET(it, r);
     481 ECB             : 
     482 GIC       44191 :             r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
     483 ECB             :                                      &startOffset, false);
     484                 : 
     485 GIC       44191 :             toInsert[nInsert] = r;
     486 CBC       44191 :             nInsert++;
     487 ECB             : 
     488                 :             /* save modified tuple into leafdata as well */
     489 GIC       44191 :             memcpy(leafptr, it, it->size);
     490 CBC       44191 :             leafptr += it->size;
     491 ECB             :         }
     492                 :     }
     493                 : 
     494                 :     /* add the new tuple as well */
     495 GIC        1108 :     SGLT_SET_NEXTOFFSET(newLeafTuple, r);
     496 CBC        1108 :     r = SpGistPageAddNewItem(state, npage,
     497            1108 :                              (Item) newLeafTuple, newLeafTuple->size,
     498 ECB             :                              &startOffset, false);
     499 GIC        1108 :     toInsert[nInsert] = r;
     500 CBC        1108 :     nInsert++;
     501            1108 :     memcpy(leafptr, newLeafTuple, newLeafTuple->size);
     502            1108 :     leafptr += newLeafTuple->size;
     503 ECB             : 
     504                 :     /*
     505                 :      * Now delete the old tuples, leaving a redirection pointer behind for the
     506                 :      * first one, unless we're doing an index build; in which case there can't
     507                 :      * be any concurrent scan so we need not provide a redirect.
     508                 :      */
     509 GIC        1108 :     spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
     510 CBC        1108 :                             state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     511 ECB             :                             SPGIST_PLACEHOLDER,
     512                 :                             nblkno, r);
     513                 : 
     514                 :     /* Update parent's downlink and mark parent page dirty */
     515 GIC        1108 :     saveNodeLink(index, parent, nblkno, r);
     516 ECB             : 
     517                 :     /* Mark the leaf pages too */
     518 GIC        1108 :     MarkBufferDirty(current->buffer);
     519 CBC        1108 :     MarkBufferDirty(nbuf);
     520 ECB             : 
     521 GIC        1108 :     if (RelationNeedsWAL(index) && !state->isBuild)
     522 ECB             :     {
     523                 :         XLogRecPtr  recptr;
     524                 : 
     525                 :         /* prepare WAL info */
     526 GIC         262 :         STORE_STATE(state, xlrec.stateSrc);
     527 ECB             : 
     528 GIC         262 :         xlrec.nMoves = nDelete;
     529 CBC         262 :         xlrec.replaceDead = replaceDead;
     530             262 :         xlrec.storesNulls = isNulls;
     531 ECB             : 
     532 GIC         262 :         xlrec.offnumParent = parent->offnum;
     533 CBC         262 :         xlrec.nodeI = parent->node;
     534 ECB             : 
     535 GIC         262 :         XLogBeginInsert();
     536 CBC         262 :         XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
     537             262 :         XLogRegisterData((char *) toDelete,
     538 ECB             :                          sizeof(OffsetNumber) * nDelete);
     539 GIC         262 :         XLogRegisterData((char *) toInsert,
     540 ECB             :                          sizeof(OffsetNumber) * nInsert);
     541 GIC         262 :         XLogRegisterData((char *) leafdata, leafptr - leafdata);
     542 ECB             : 
     543 GIC         262 :         XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
     544 CBC         262 :         XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
     545             262 :         XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
     546 ECB             : 
     547 GIC         262 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
     548 ECB             : 
     549 GIC         262 :         PageSetLSN(current->page, recptr);
     550 CBC         262 :         PageSetLSN(npage, recptr);
     551             262 :         PageSetLSN(parent->page, recptr);
     552 ECB             :     }
     553                 : 
     554 GIC        1108 :     END_CRIT_SECTION();
     555 ECB             : 
     556                 :     /* Update local free-space cache and release new buffer */
     557 GIC        1108 :     SpGistSetLastUsedPage(index, nbuf);
     558 CBC        1108 :     UnlockReleaseBuffer(nbuf);
     559            1108 : }
     560 ECB             : 
     561                 : /*
     562                 :  * Update previously-created redirection tuple with appropriate destination
     563                 :  *
     564                 :  * We use this when it's not convenient to know the destination first.
     565                 :  * The tuple should have been made with the "impossible" destination of
     566                 :  * the metapage.
     567                 :  */
     568                 : static void
     569 GIC         639 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
     570 ECB             :                     BlockNumber blkno, OffsetNumber offnum)
     571                 : {
     572                 :     SpGistDeadTuple dt;
     573                 : 
     574 GIC         639 :     dt = (SpGistDeadTuple) PageGetItem(current->page,
     575 ECB             :                                        PageGetItemId(current->page, position));
     576 GIC         639 :     Assert(dt->tupstate == SPGIST_REDIRECT);
     577 CBC         639 :     Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
     578             639 :     ItemPointerSet(&dt->pointer, blkno, offnum);
     579             639 : }
     580 ECB             : 
     581                 : /*
     582                 :  * Test to see if the user-defined picksplit function failed to do its job,
     583                 :  * ie, it put all the leaf tuples into the same node.
     584                 :  * If so, randomly divide the tuples into several nodes (all with the same
     585                 :  * label) and return true to select allTheSame mode for this inner tuple.
     586                 :  *
     587                 :  * (This code is also used to forcibly select allTheSame mode for nulls.)
     588                 :  *
     589                 :  * If we know that the leaf tuples wouldn't all fit on one page, then we
     590                 :  * exclude the last tuple (which is the incoming new tuple that forced a split)
     591                 :  * from the check to see if more than one node is used.  The reason for this
     592                 :  * is that if the existing tuples are put into only one chain, then even if
     593                 :  * we move them all to an empty page, there would still not be room for the
     594                 :  * new tuple, so we'd get into an infinite loop of picksplit attempts.
     595                 :  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
     596                 :  * be split across pages.  (Exercise for the reader: figure out why this
     597                 :  * fixes the problem even when there is only one old tuple.)
     598                 :  */
     599                 : static bool
     600 GIC        2840 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
     601 ECB             :                 bool *includeNew)
     602                 : {
     603                 :     int         theNode;
     604                 :     int         limit;
     605                 :     int         i;
     606                 : 
     607                 :     /* For the moment, assume we can include the new leaf tuple */
     608 GIC        2840 :     *includeNew = true;
     609 ECB             : 
     610                 :     /* If there's only the new leaf tuple, don't select allTheSame mode */
     611 GIC        2840 :     if (in->nTuples <= 1)
     612 CBC          22 :         return false;
     613 ECB             : 
     614                 :     /* If tuple set doesn't fit on one page, ignore the new tuple in test */
     615 GIC        2818 :     limit = tooBig ? in->nTuples - 1 : in->nTuples;
     616 ECB             : 
     617                 :     /* Check to see if more than one node is populated */
     618 GIC        2818 :     theNode = out->mapTuplesToNodes[0];
     619 CBC       63028 :     for (i = 1; i < limit; i++)
     620 ECB             :     {
     621 GIC       62876 :         if (out->mapTuplesToNodes[i] != theNode)
     622 CBC        2666 :             return false;
     623 ECB             :     }
     624                 : 
     625                 :     /* Nope, so override the picksplit function's decisions */
     626                 : 
     627                 :     /* If the new tuple is in its own node, it can't be included in split */
     628 GIC         152 :     if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
     629 LBC           0 :         *includeNew = false;
     630 EUB             : 
     631 GIC         152 :     out->nNodes = 8;         /* arbitrary number of child nodes */
     632 ECB             : 
     633                 :     /* Random assignment of tuples to nodes (note we include new tuple) */
     634 GIC       16118 :     for (i = 0; i < in->nTuples; i++)
     635 CBC       15966 :         out->mapTuplesToNodes[i] = i % out->nNodes;
     636 ECB             : 
     637                 :     /* The opclass may not use node labels, but if it does, duplicate 'em */
     638 GIC         152 :     if (out->nodeLabels)
     639 ECB             :     {
     640 GIC          26 :         Datum       theLabel = out->nodeLabels[theNode];
     641 ECB             : 
     642 GIC          26 :         out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
     643 CBC         234 :         for (i = 0; i < out->nNodes; i++)
     644             208 :             out->nodeLabels[i] = theLabel;
     645 ECB             :     }
     646                 : 
     647                 :     /* We don't touch the prefix or the leaf tuple datum assignments */
     648                 : 
     649 GIC         152 :     return true;
     650 ECB             : }
     651                 : 
     652                 : /*
     653                 :  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
     654                 :  * but the chain has to be split because there's not enough room to add
     655                 :  * newLeafTuple to its page.
     656                 :  *
     657                 :  * This function splits the leaf tuple set according to picksplit's rules,
     658                 :  * creating one or more new chains that are spread across the current page
     659                 :  * and an additional leaf page (we assume that two leaf pages will be
     660                 :  * sufficient).  A new inner tuple is created, and the parent downlink
     661                 :  * pointer is updated to point to that inner tuple instead of the leaf chain.
     662                 :  *
     663                 :  * On exit, current contains the address of the new inner tuple.
     664                 :  *
     665                 :  * Returns true if we successfully inserted newLeafTuple during this function,
     666                 :  * false if caller still has to do it (meaning another picksplit operation is
     667                 :  * probably needed).  Failure could occur if the picksplit result is fairly
     668                 :  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
     669                 :  * Because we force the picksplit result to be at least two chains, each
     670                 :  * cycle will get rid of at least one leaf tuple from the chain, so the loop
     671                 :  * will eventually terminate if lack of balance is the issue.  If the tuple
     672                 :  * is too big, we assume that repeated picksplit operations will eventually
     673                 :  * make it small enough by repeated prefix-stripping.  A broken opclass could
     674                 :  * make this an infinite loop, though, so spgdoinsert() checks that the
     675                 :  * leaf datums get smaller each time.
     676                 :  */
     677                 : static bool
     678 GIC        2840 : doPickSplit(Relation index, SpGistState *state,
     679 ECB             :             SPPageDesc *current, SPPageDesc *parent,
     680                 :             SpGistLeafTuple newLeafTuple,
     681                 :             int level, bool isNulls, bool isNew)
     682                 : {
     683 GIC        2840 :     bool        insertedNew = false;
     684 ECB             :     spgPickSplitIn in;
     685                 :     spgPickSplitOut out;
     686                 :     FmgrInfo   *procinfo;
     687                 :     bool        includeNew;
     688                 :     int         i,
     689                 :                 max,
     690                 :                 n;
     691                 :     SpGistInnerTuple innerTuple;
     692                 :     SpGistNodeTuple node,
     693                 :                *nodes;
     694                 :     Buffer      newInnerBuffer,
     695                 :                 newLeafBuffer;
     696                 :     uint8      *leafPageSelect;
     697                 :     int        *leafSizes;
     698                 :     OffsetNumber *toDelete;
     699                 :     OffsetNumber *toInsert;
     700 GIC        2840 :     OffsetNumber redirectTuplePos = InvalidOffsetNumber;
     701 ECB             :     OffsetNumber startOffsets[2];
     702                 :     SpGistLeafTuple *oldLeafs;
     703                 :     SpGistLeafTuple *newLeafs;
     704                 :     Datum       leafDatums[INDEX_MAX_KEYS];
     705                 :     bool        leafIsnulls[INDEX_MAX_KEYS];
     706                 :     int         spaceToDelete;
     707                 :     int         currentFreeSpace;
     708                 :     int         totalLeafSizes;
     709                 :     bool        allTheSame;
     710                 :     spgxlogPickSplit xlrec;
     711                 :     char       *leafdata,
     712                 :                *leafptr;
     713                 :     SPPageDesc  saveCurrent;
     714                 :     int         nToDelete,
     715                 :                 nToInsert,
     716                 :                 maxToInclude;
     717                 : 
     718 GIC        2840 :     in.level = level;
     719 ECB             : 
     720                 :     /*
     721                 :      * Allocate per-leaf-tuple work arrays with max possible size
     722                 :      */
     723 GIC        2840 :     max = PageGetMaxOffsetNumber(current->page);
     724 CBC        2840 :     n = max + 1;
     725            2840 :     in.datums = (Datum *) palloc(sizeof(Datum) * n);
     726            2840 :     toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
     727            2840 :     toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
     728            2840 :     oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
     729            2840 :     newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
     730            2840 :     leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
     731 ECB             : 
     732 GIC        2840 :     STORE_STATE(state, xlrec.stateSrc);
     733 ECB             : 
     734                 :     /*
     735                 :      * Form list of leaf tuples which will be distributed as split result;
     736                 :      * also, count up the amount of space that will be freed from current.
     737                 :      * (Note that in the non-root case, we won't actually delete the old
     738                 :      * tuples, only replace them with redirects or placeholders.)
     739                 :      */
     740 GIC        2840 :     nToInsert = 0;
     741 CBC        2840 :     nToDelete = 0;
     742            2840 :     spaceToDelete = 0;
     743            2840 :     if (SpGistBlockIsRoot(current->blkno))
     744 ECB             :     {
     745                 :         /*
     746                 :          * We are splitting the root (which up to now is also a leaf page).
     747                 :          * Its tuples are not linked, so scan sequentially to get them all. We
     748                 :          * ignore the original value of current->offnum.
     749                 :          */
     750 GIC        7709 :         for (i = FirstOffsetNumber; i <= max; i++)
     751 ECB             :         {
     752                 :             SpGistLeafTuple it;
     753                 : 
     754 GIC        7668 :             it = (SpGistLeafTuple) PageGetItem(current->page,
     755 ECB             :                                                PageGetItemId(current->page, i));
     756 GIC        7668 :             if (it->tupstate == SPGIST_LIVE)
     757 ECB             :             {
     758 GIC        7668 :                 in.datums[nToInsert] =
     759 CBC        7668 :                     isNulls ? (Datum) 0 : SGLTDATUM(it, state);
     760            7668 :                 oldLeafs[nToInsert] = it;
     761            7668 :                 nToInsert++;
     762            7668 :                 toDelete[nToDelete] = i;
     763            7668 :                 nToDelete++;
     764 ECB             :                 /* we will delete the tuple altogether, so count full space */
     765 GIC        7668 :                 spaceToDelete += it->size + sizeof(ItemIdData);
     766 ECB             :             }
     767                 :             else                /* tuples on root should be live */
     768 UIC           0 :                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     769 EUB             :         }
     770                 :     }
     771                 :     else
     772                 :     {
     773                 :         /* Normal case, just collect the leaf tuples in the chain */
     774 GIC        2799 :         i = current->offnum;
     775 CBC      354850 :         while (i != InvalidOffsetNumber)
     776 ECB             :         {
     777                 :             SpGistLeafTuple it;
     778                 : 
     779 GIC      352051 :             Assert(i >= FirstOffsetNumber && i <= max);
     780 CBC      352051 :             it = (SpGistLeafTuple) PageGetItem(current->page,
     781 ECB             :                                                PageGetItemId(current->page, i));
     782 GIC      352051 :             if (it->tupstate == SPGIST_LIVE)
     783 ECB             :             {
     784 GIC      352051 :                 in.datums[nToInsert] =
     785 CBC      352051 :                     isNulls ? (Datum) 0 : SGLTDATUM(it, state);
     786          352051 :                 oldLeafs[nToInsert] = it;
     787          352051 :                 nToInsert++;
     788          352051 :                 toDelete[nToDelete] = i;
     789          352051 :                 nToDelete++;
     790 ECB             :                 /* we will not delete the tuple, only replace with dead */
     791 GIC      352051 :                 Assert(it->size >= SGDTSIZE);
     792 CBC      352051 :                 spaceToDelete += it->size - SGDTSIZE;
     793 ECB             :             }
     794 UIC           0 :             else if (it->tupstate == SPGIST_DEAD)
     795 EUB             :             {
     796                 :                 /* We could see a DEAD tuple as first/only chain item */
     797 UIC           0 :                 Assert(i == current->offnum);
     798 UBC           0 :                 Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
     799               0 :                 toDelete[nToDelete] = i;
     800               0 :                 nToDelete++;
     801 EUB             :                 /* replacing it with redirect will save no space */
     802                 :             }
     803                 :             else
     804 UIC           0 :                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
     805 EUB             : 
     806 GIC      352051 :             i = SGLT_GET_NEXTOFFSET(it);
     807 ECB             :         }
     808                 :     }
     809 GIC        2840 :     in.nTuples = nToInsert;
     810 ECB             : 
     811                 :     /*
     812                 :      * We may not actually insert new tuple because another picksplit may be
     813                 :      * necessary due to too large value, but we will try to allocate enough
     814                 :      * space to include it; and in any case it has to be included in the input
     815                 :      * for the picksplit function.  So don't increment nToInsert yet.
     816                 :      */
     817 GIC        2840 :     in.datums[in.nTuples] =
     818 CBC        2840 :         isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
     819            2840 :     oldLeafs[in.nTuples] = newLeafTuple;
     820            2840 :     in.nTuples++;
     821 ECB             : 
     822 GIC        2840 :     memset(&out, 0, sizeof(out));
     823 ECB             : 
     824 GIC        2840 :     if (!isNulls)
     825 ECB             :     {
     826                 :         /*
     827                 :          * Perform split using user-defined method.
     828                 :          */
     829 GIC        2840 :         procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
     830 CBC        2840 :         FunctionCall2Coll(procinfo,
     831            2840 :                           index->rd_indcollation[0],
     832 ECB             :                           PointerGetDatum(&in),
     833                 :                           PointerGetDatum(&out));
     834                 : 
     835                 :         /*
     836                 :          * Form new leaf tuples and count up the total space needed.
     837                 :          */
     838 GIC        2840 :         totalLeafSizes = 0;
     839 CBC      365399 :         for (i = 0; i < in.nTuples; i++)
     840 ECB             :         {
     841 GIC      362559 :             if (state->leafTupDesc->natts > 1)
     842 CBC       32206 :                 spgDeformLeafTuple(oldLeafs[i],
     843 ECB             :                                    state->leafTupDesc,
     844                 :                                    leafDatums,
     845                 :                                    leafIsnulls,
     846                 :                                    isNulls);
     847                 : 
     848 GIC      362559 :             leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
     849 CBC      362559 :             leafIsnulls[spgKeyColumn] = false;
     850 ECB             : 
     851 GIC      362559 :             newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
     852 ECB             :                                            leafDatums,
     853                 :                                            leafIsnulls);
     854 GIC      362559 :             totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
     855 ECB             :         }
     856                 :     }
     857                 :     else
     858                 :     {
     859                 :         /*
     860                 :          * Perform dummy split that puts all tuples into one node.
     861                 :          * checkAllTheSame will override this and force allTheSame mode.
     862                 :          */
     863 UIC           0 :         out.hasPrefix = false;
     864 UBC           0 :         out.nNodes = 1;
     865               0 :         out.nodeLabels = NULL;
     866               0 :         out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
     867 EUB             : 
     868                 :         /*
     869                 :          * Form new leaf tuples and count up the total space needed.
     870                 :          */
     871 UIC           0 :         totalLeafSizes = 0;
     872 UBC           0 :         for (i = 0; i < in.nTuples; i++)
     873 EUB             :         {
     874 UIC           0 :             if (state->leafTupDesc->natts > 1)
     875 UBC           0 :                 spgDeformLeafTuple(oldLeafs[i],
     876 EUB             :                                    state->leafTupDesc,
     877                 :                                    leafDatums,
     878                 :                                    leafIsnulls,
     879                 :                                    isNulls);
     880                 : 
     881                 :             /*
     882                 :              * Nulls tree can contain only null key values.
     883                 :              */
     884 UIC           0 :             leafDatums[spgKeyColumn] = (Datum) 0;
     885 UBC           0 :             leafIsnulls[spgKeyColumn] = true;
     886 EUB             : 
     887 UIC           0 :             newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
     888 EUB             :                                            leafDatums,
     889                 :                                            leafIsnulls);
     890 UIC           0 :             totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
     891 EUB             :         }
     892                 :     }
     893                 : 
     894                 :     /*
     895                 :      * Check to see if the picksplit function failed to separate the values,
     896                 :      * ie, it put them all into the same child node.  If so, select allTheSame
     897                 :      * mode and create a random split instead.  See comments for
     898                 :      * checkAllTheSame as to why we need to know if the new leaf tuples could
     899                 :      * fit on one page.
     900                 :      */
     901 GIC        2840 :     allTheSame = checkAllTheSame(&in, &out,
     902 ECB             :                                  totalLeafSizes > SPGIST_PAGE_CAPACITY,
     903                 :                                  &includeNew);
     904                 : 
     905                 :     /*
     906                 :      * If checkAllTheSame decided we must exclude the new tuple, don't
     907                 :      * consider it any further.
     908                 :      */
     909 GIC        2840 :     if (includeNew)
     910 CBC        2840 :         maxToInclude = in.nTuples;
     911 ECB             :     else
     912                 :     {
     913 UIC           0 :         maxToInclude = in.nTuples - 1;
     914 UBC           0 :         totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
     915 EUB             :     }
     916                 : 
     917                 :     /*
     918                 :      * Allocate per-node work arrays.  Since checkAllTheSame could replace
     919                 :      * out.nNodes with a value larger than the number of tuples on the input
     920                 :      * page, we can't allocate these arrays before here.
     921                 :      */
     922 GIC        2840 :     nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
     923 CBC        2840 :     leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
     924 ECB             : 
     925                 :     /*
     926                 :      * Form nodes of inner tuple and inner tuple itself
     927                 :      */
     928 GIC       22035 :     for (i = 0; i < out.nNodes; i++)
     929 ECB             :     {
     930 GIC       19195 :         Datum       label = (Datum) 0;
     931 CBC       19195 :         bool        labelisnull = (out.nodeLabels == NULL);
     932 ECB             : 
     933 GIC       19195 :         if (!labelisnull)
     934 CBC        1791 :             label = out.nodeLabels[i];
     935           19195 :         nodes[i] = spgFormNodeTuple(state, label, labelisnull);
     936 ECB             :     }
     937 GIC        2840 :     innerTuple = spgFormInnerTuple(state,
     938 CBC        2840 :                                    out.hasPrefix, out.prefixDatum,
     939 ECB             :                                    out.nNodes, nodes);
     940 GIC        2840 :     innerTuple->allTheSame = allTheSame;
     941 ECB             : 
     942                 :     /*
     943                 :      * Update nodes[] array to point into the newly formed innerTuple, so that
     944                 :      * we can adjust their downlinks below.
     945                 :      */
     946 GIC       22035 :     SGITITERATE(innerTuple, i, node)
     947 ECB             :     {
     948 GIC       19195 :         nodes[i] = node;
     949 ECB             :     }
     950                 : 
     951                 :     /*
     952                 :      * Re-scan new leaf tuples and count up the space needed under each node.
     953                 :      */
     954 GIC      365399 :     for (i = 0; i < maxToInclude; i++)
     955 ECB             :     {
     956 GIC      362559 :         n = out.mapTuplesToNodes[i];
     957 CBC      362559 :         if (n < 0 || n >= out.nNodes)
     958 LBC           0 :             elog(ERROR, "inconsistent result of SPGiST picksplit function");
     959 GBC      362559 :         leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
     960 ECB             :     }
     961                 : 
     962                 :     /*
     963                 :      * To perform the split, we must insert a new inner tuple, which can't go
     964                 :      * on a leaf page; and unless we are splitting the root page, we must then
     965                 :      * update the parent tuple's downlink to point to the inner tuple.  If
     966                 :      * there is room, we'll put the new inner tuple on the same page as the
     967                 :      * parent tuple, otherwise we need another non-leaf buffer. But if the
     968                 :      * parent page is the root, we can't add the new inner tuple there,
     969                 :      * because the root page must have only one inner tuple.
     970                 :      */
     971 GIC        2840 :     xlrec.initInner = false;
     972 CBC        2840 :     if (parent->buffer != InvalidBuffer &&
     973            2799 :         !SpGistBlockIsRoot(parent->blkno) &&
     974            2619 :         (SpGistPageGetFreeSpace(parent->page, 1) >=
     975            2619 :          innerTuple->size + sizeof(ItemIdData)))
     976 ECB             :     {
     977                 :         /* New inner tuple will fit on parent page */
     978 GIC        2415 :         newInnerBuffer = parent->buffer;
     979 ECB             :     }
     980 GIC         425 :     else if (parent->buffer != InvalidBuffer)
     981 ECB             :     {
     982                 :         /* Send tuple to page with next triple parity (see README) */
     983 GIC         768 :         newInnerBuffer = SpGistGetBuffer(index,
     984 CBC         384 :                                          GBUF_INNER_PARITY(parent->blkno + 1) |
     985             384 :                                          (isNulls ? GBUF_NULLS : 0),
     986             384 :                                          innerTuple->size + sizeof(ItemIdData),
     987 ECB             :                                          &xlrec.initInner);
     988                 :     }
     989                 :     else
     990                 :     {
     991                 :         /* Root page split ... inner tuple will go to root page */
     992 GIC          41 :         newInnerBuffer = InvalidBuffer;
     993 ECB             :     }
     994                 : 
     995                 :     /*
     996                 :      * The new leaf tuples converted from the existing ones should require the
     997                 :      * same or less space, and therefore should all fit onto one page
     998                 :      * (although that's not necessarily the current page, since we can't
     999                 :      * delete the old tuples but only replace them with placeholders).
    1000                 :      * However, the incoming new tuple might not also fit, in which case we
    1001                 :      * might need another picksplit cycle to reduce it some more.
    1002                 :      *
    1003                 :      * If there's not room to put everything back onto the current page, then
    1004                 :      * we decide on a per-node basis which tuples go to the new page. (We do
    1005                 :      * it like that because leaf tuple chains can't cross pages, so we must
    1006                 :      * place all leaf tuples belonging to the same parent node on the same
    1007                 :      * page.)
    1008                 :      *
    1009                 :      * If we are splitting the root page (turning it from a leaf page into an
    1010                 :      * inner page), then no leaf tuples can go back to the current page; they
    1011                 :      * must all go somewhere else.
    1012                 :      */
    1013 GIC        2840 :     if (!SpGistBlockIsRoot(current->blkno))
    1014 CBC        2799 :         currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
    1015 ECB             :     else
    1016 GIC          41 :         currentFreeSpace = 0;   /* prevent assigning any tuples to current */
    1017 ECB             : 
    1018 GIC        2840 :     xlrec.initDest = false;
    1019 ECB             : 
    1020 GIC        2840 :     if (totalLeafSizes <= currentFreeSpace)
    1021 ECB             :     {
    1022                 :         /* All the leaf tuples will fit on current page */
    1023 GIC          12 :         newLeafBuffer = InvalidBuffer;
    1024 ECB             :         /* mark new leaf tuple as included in insertions, if allowed */
    1025 GIC          12 :         if (includeNew)
    1026 ECB             :         {
    1027 GIC          12 :             nToInsert++;
    1028 CBC          12 :             insertedNew = true;
    1029 ECB             :         }
    1030 GIC        1485 :         for (i = 0; i < nToInsert; i++)
    1031 CBC        1473 :             leafPageSelect[i] = 0;  /* signifies current page */
    1032 ECB             :     }
    1033 GIC        2828 :     else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
    1034 ECB             :     {
    1035                 :         /*
    1036                 :          * We're trying to split up a long value by repeated suffixing, but
    1037                 :          * it's not going to fit yet.  Don't bother allocating a second leaf
    1038                 :          * buffer that we won't be able to use.
    1039                 :          */
    1040 GIC          22 :         newLeafBuffer = InvalidBuffer;
    1041 CBC          22 :         Assert(includeNew);
    1042              22 :         Assert(nToInsert == 0);
    1043 ECB             :     }
    1044                 :     else
    1045                 :     {
    1046                 :         /* We will need another leaf page */
    1047                 :         uint8      *nodePageSelect;
    1048                 :         int         curspace;
    1049                 :         int         newspace;
    1050                 : 
    1051 GIC        2806 :         newLeafBuffer = SpGistGetBuffer(index,
    1052 ECB             :                                         GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
    1053 GIC        2806 :                                         Min(totalLeafSizes,
    1054 ECB             :                                             SPGIST_PAGE_CAPACITY),
    1055                 :                                         &xlrec.initDest);
    1056                 : 
    1057                 :         /*
    1058                 :          * Attempt to assign node groups to the two pages.  We might fail to
    1059                 :          * do so, even if totalLeafSizes is less than the available space,
    1060                 :          * because we can't split a group across pages.
    1061                 :          */
    1062 GIC        2806 :         nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
    1063 ECB             : 
    1064 GIC        2806 :         curspace = currentFreeSpace;
    1065 CBC        2806 :         newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
    1066           21955 :         for (i = 0; i < out.nNodes; i++)
    1067 ECB             :         {
    1068 GIC       19149 :             if (leafSizes[i] <= curspace)
    1069 ECB             :             {
    1070 GIC       12430 :                 nodePageSelect[i] = 0;  /* signifies current page */
    1071 CBC       12430 :                 curspace -= leafSizes[i];
    1072 ECB             :             }
    1073                 :             else
    1074                 :             {
    1075 GIC        6719 :                 nodePageSelect[i] = 1;  /* signifies new leaf page */
    1076 CBC        6719 :                 newspace -= leafSizes[i];
    1077 ECB             :             }
    1078                 :         }
    1079 GIC        2806 :         if (curspace >= 0 && newspace >= 0)
    1080 ECB             :         {
    1081                 :             /* Successful assignment, so we can include the new leaf tuple */
    1082 GIC        2691 :             if (includeNew)
    1083 ECB             :             {
    1084 GIC        2691 :                 nToInsert++;
    1085 CBC        2691 :                 insertedNew = true;
    1086 ECB             :             }
    1087                 :         }
    1088 GIC         115 :         else if (includeNew)
    1089 ECB             :         {
    1090                 :             /* We must exclude the new leaf tuple from the split */
    1091 GIC         115 :             int         nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
    1092 ECB             : 
    1093 GIC         115 :             leafSizes[nodeOfNewTuple] -=
    1094 CBC         115 :                 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
    1095 ECB             : 
    1096                 :             /* Repeat the node assignment process --- should succeed now */
    1097 GIC         115 :             curspace = currentFreeSpace;
    1098 CBC         115 :             newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
    1099             557 :             for (i = 0; i < out.nNodes; i++)
    1100 ECB             :             {
    1101 GIC         442 :                 if (leafSizes[i] <= curspace)
    1102 ECB             :                 {
    1103 GIC         142 :                     nodePageSelect[i] = 0;  /* signifies current page */
    1104 CBC         142 :                     curspace -= leafSizes[i];
    1105 ECB             :                 }
    1106                 :                 else
    1107                 :                 {
    1108 GIC         300 :                     nodePageSelect[i] = 1;  /* signifies new leaf page */
    1109 CBC         300 :                     newspace -= leafSizes[i];
    1110 ECB             :                 }
    1111                 :             }
    1112 GIC         115 :             if (curspace < 0 || newspace < 0)
    1113 LBC           0 :                 elog(ERROR, "failed to divide leaf tuple groups across pages");
    1114 EUB             :         }
    1115                 :         else
    1116                 :         {
    1117                 :             /* oops, we already excluded new tuple ... should not get here */
    1118 UIC           0 :             elog(ERROR, "failed to divide leaf tuple groups across pages");
    1119 EUB             :         }
    1120                 :         /* Expand the per-node assignments to be shown per leaf tuple */
    1121 GIC      363755 :         for (i = 0; i < nToInsert; i++)
    1122 ECB             :         {
    1123 GIC      360949 :             n = out.mapTuplesToNodes[i];
    1124 CBC      360949 :             leafPageSelect[i] = nodePageSelect[n];
    1125 ECB             :         }
    1126                 :     }
    1127                 : 
    1128                 :     /* Start preparing WAL record */
    1129 GIC        2840 :     xlrec.nDelete = 0;
    1130 CBC        2840 :     xlrec.initSrc = isNew;
    1131            2840 :     xlrec.storesNulls = isNulls;
    1132            2840 :     xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
    1133 ECB             : 
    1134 GIC        2840 :     leafdata = leafptr = (char *) palloc(totalLeafSizes);
    1135 ECB             : 
    1136                 :     /* Here we begin making the changes to the target pages */
    1137 GIC        2840 :     START_CRIT_SECTION();
    1138 ECB             : 
    1139                 :     /*
    1140                 :      * Delete old leaf tuples from current buffer, except when we're splitting
    1141                 :      * the root; in that case there's no need because we'll re-init the page
    1142                 :      * below.  We do this first to make room for reinserting new leaf tuples.
    1143                 :      */
    1144 GIC        2840 :     if (!SpGistBlockIsRoot(current->blkno))
    1145 ECB             :     {
    1146                 :         /*
    1147                 :          * Init buffer instead of deleting individual tuples, but only if
    1148                 :          * there aren't any other live tuples and only during build; otherwise
    1149                 :          * we need to set a redirection tuple for concurrent scans.
    1150                 :          */
    1151 GIC        2799 :         if (state->isBuild &&
    1152 CBC        2138 :             nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
    1153            2138 :             PageGetMaxOffsetNumber(current->page))
    1154 ECB             :         {
    1155 GIC         155 :             SpGistInitBuffer(current->buffer,
    1156 ECB             :                              SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
    1157 GIC         155 :             xlrec.initSrc = true;
    1158 ECB             :         }
    1159 GIC        2644 :         else if (isNew)
    1160 ECB             :         {
    1161                 :             /* don't expose the freshly init'd buffer as a backup block */
    1162 GIC          22 :             Assert(nToDelete == 0);
    1163 ECB             :         }
    1164                 :         else
    1165                 :         {
    1166 GIC        2622 :             xlrec.nDelete = nToDelete;
    1167 ECB             : 
    1168 GIC        2622 :             if (!state->isBuild)
    1169 ECB             :             {
    1170                 :                 /*
    1171                 :                  * Need to create redirect tuple (it will point to new inner
    1172                 :                  * tuple) but right now the new tuple's location is not known
    1173                 :                  * yet.  So, set the redirection pointer to "impossible" value
    1174                 :                  * and remember its position to update tuple later.
    1175                 :                  */
    1176 GIC         639 :                 if (nToDelete > 0)
    1177 CBC         639 :                     redirectTuplePos = toDelete[0];
    1178             639 :                 spgPageIndexMultiDelete(state, current->page,
    1179 ECB             :                                         toDelete, nToDelete,
    1180                 :                                         SPGIST_REDIRECT,
    1181                 :                                         SPGIST_PLACEHOLDER,
    1182                 :                                         SPGIST_METAPAGE_BLKNO,
    1183                 :                                         FirstOffsetNumber);
    1184                 :             }
    1185                 :             else
    1186                 :             {
    1187                 :                 /*
    1188                 :                  * During index build there is not concurrent searches, so we
    1189                 :                  * don't need to create redirection tuple.
    1190                 :                  */
    1191 GIC        1983 :                 spgPageIndexMultiDelete(state, current->page,
    1192 ECB             :                                         toDelete, nToDelete,
    1193                 :                                         SPGIST_PLACEHOLDER,
    1194                 :                                         SPGIST_PLACEHOLDER,
    1195                 :                                         InvalidBlockNumber,
    1196                 :                                         InvalidOffsetNumber);
    1197                 :             }
    1198                 :         }
    1199                 :     }
    1200                 : 
    1201                 :     /*
    1202                 :      * Put leaf tuples on proper pages, and update downlinks in innerTuple's
    1203                 :      * nodes.
    1204                 :      */
    1205 GIC        2840 :     startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
    1206 CBC      365262 :     for (i = 0; i < nToInsert; i++)
    1207 ECB             :     {
    1208 GIC      362422 :         SpGistLeafTuple it = newLeafs[i];
    1209 ECB             :         Buffer      leafBuffer;
    1210                 :         BlockNumber leafBlock;
    1211                 :         OffsetNumber newoffset;
    1212                 : 
    1213                 :         /* Which page is it going to? */
    1214 GIC      362422 :         leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
    1215 CBC      362422 :         leafBlock = BufferGetBlockNumber(leafBuffer);
    1216 ECB             : 
    1217                 :         /* Link tuple into correct chain for its node */
    1218 GIC      362422 :         n = out.mapTuplesToNodes[i];
    1219 ECB             : 
    1220 GIC      362422 :         if (ItemPointerIsValid(&nodes[n]->t_tid))
    1221 ECB             :         {
    1222 GIC      351516 :             Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
    1223 CBC      351516 :             SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
    1224 ECB             :         }
    1225                 :         else
    1226 GIC       10906 :             SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
    1227 ECB             : 
    1228                 :         /* Insert it on page */
    1229 GIC      362422 :         newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
    1230 CBC      362422 :                                          (Item) it, it->size,
    1231          362422 :                                          &startOffsets[leafPageSelect[i]],
    1232 ECB             :                                          false);
    1233 GIC      362422 :         toInsert[i] = newoffset;
    1234 ECB             : 
    1235                 :         /* ... and complete the chain linking */
    1236 GIC      362422 :         ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
    1237 ECB             : 
    1238                 :         /* Also copy leaf tuple into WAL data */
    1239 GIC      362422 :         memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
    1240 CBC      362422 :         leafptr += newLeafs[i]->size;
    1241 ECB             :     }
    1242                 : 
    1243                 :     /*
    1244                 :      * We're done modifying the other leaf buffer (if any), so mark it dirty.
    1245                 :      * current->buffer will be marked below, after we're entirely done
    1246                 :      * modifying it.
    1247                 :      */
    1248 GIC        2840 :     if (newLeafBuffer != InvalidBuffer)
    1249 ECB             :     {
    1250 GIC        2806 :         MarkBufferDirty(newLeafBuffer);
    1251 ECB             :     }
    1252                 : 
    1253                 :     /* Remember current buffer, since we're about to change "current" */
    1254 GIC        2840 :     saveCurrent = *current;
    1255 ECB             : 
    1256                 :     /*
    1257                 :      * Store the new innerTuple
    1258                 :      */
    1259 GIC        2840 :     if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
    1260 ECB             :     {
    1261                 :         /*
    1262                 :          * new inner tuple goes to parent page
    1263                 :          */
    1264 GIC        2415 :         Assert(current->buffer != parent->buffer);
    1265 ECB             : 
    1266                 :         /* Repoint "current" at the new inner tuple */
    1267 GIC        2415 :         current->blkno = parent->blkno;
    1268 CBC        2415 :         current->buffer = parent->buffer;
    1269            2415 :         current->page = parent->page;
    1270            2415 :         xlrec.offnumInner = current->offnum =
    1271            2415 :             SpGistPageAddNewItem(state, current->page,
    1272            2415 :                                  (Item) innerTuple, innerTuple->size,
    1273 ECB             :                                  NULL, false);
    1274                 : 
    1275                 :         /*
    1276                 :          * Update parent node link and mark parent page dirty
    1277                 :          */
    1278 GIC        2415 :         xlrec.innerIsParent = true;
    1279 CBC        2415 :         xlrec.offnumParent = parent->offnum;
    1280            2415 :         xlrec.nodeI = parent->node;
    1281            2415 :         saveNodeLink(index, parent, current->blkno, current->offnum);
    1282 ECB             : 
    1283                 :         /*
    1284                 :          * Update redirection link (in old current buffer)
    1285                 :          */
    1286 GIC        2415 :         if (redirectTuplePos != InvalidOffsetNumber)
    1287 CBC         577 :             setRedirectionTuple(&saveCurrent, redirectTuplePos,
    1288             577 :                                 current->blkno, current->offnum);
    1289 ECB             : 
    1290                 :         /* Done modifying old current buffer, mark it dirty */
    1291 GIC        2415 :         MarkBufferDirty(saveCurrent.buffer);
    1292 ECB             :     }
    1293 GIC         425 :     else if (parent->buffer != InvalidBuffer)
    1294 ECB             :     {
    1295                 :         /*
    1296                 :          * new inner tuple will be stored on a new page
    1297                 :          */
    1298 GIC         384 :         Assert(newInnerBuffer != InvalidBuffer);
    1299 ECB             : 
    1300                 :         /* Repoint "current" at the new inner tuple */
    1301 GIC         384 :         current->buffer = newInnerBuffer;
    1302 CBC         384 :         current->blkno = BufferGetBlockNumber(current->buffer);
    1303             384 :         current->page = BufferGetPage(current->buffer);
    1304             384 :         xlrec.offnumInner = current->offnum =
    1305             384 :             SpGistPageAddNewItem(state, current->page,
    1306             384 :                                  (Item) innerTuple, innerTuple->size,
    1307 ECB             :                                  NULL, false);
    1308                 : 
    1309                 :         /* Done modifying new current buffer, mark it dirty */
    1310 GIC         384 :         MarkBufferDirty(current->buffer);
    1311 ECB             : 
    1312                 :         /*
    1313                 :          * Update parent node link and mark parent page dirty
    1314                 :          */
    1315 GIC         384 :         xlrec.innerIsParent = (parent->buffer == current->buffer);
    1316 CBC         384 :         xlrec.offnumParent = parent->offnum;
    1317             384 :         xlrec.nodeI = parent->node;
    1318             384 :         saveNodeLink(index, parent, current->blkno, current->offnum);
    1319 ECB             : 
    1320                 :         /*
    1321                 :          * Update redirection link (in old current buffer)
    1322                 :          */
    1323 GIC         384 :         if (redirectTuplePos != InvalidOffsetNumber)
    1324 CBC          62 :             setRedirectionTuple(&saveCurrent, redirectTuplePos,
    1325              62 :                                 current->blkno, current->offnum);
    1326 ECB             : 
    1327                 :         /* Done modifying old current buffer, mark it dirty */
    1328 GIC         384 :         MarkBufferDirty(saveCurrent.buffer);
    1329 ECB             :     }
    1330                 :     else
    1331                 :     {
    1332                 :         /*
    1333                 :          * Splitting root page, which was a leaf but now becomes inner page
    1334                 :          * (and so "current" continues to point at it)
    1335                 :          */
    1336 GIC          41 :         Assert(SpGistBlockIsRoot(current->blkno));
    1337 CBC          41 :         Assert(redirectTuplePos == InvalidOffsetNumber);
    1338 ECB             : 
    1339 GIC          41 :         SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
    1340 CBC          41 :         xlrec.initInner = true;
    1341              41 :         xlrec.innerIsParent = false;
    1342 ECB             : 
    1343 GIC          41 :         xlrec.offnumInner = current->offnum =
    1344 CBC          41 :             PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
    1345 ECB             :                         InvalidOffsetNumber, false, false);
    1346 GIC          41 :         if (current->offnum != FirstOffsetNumber)
    1347 LBC           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1348 EUB             :                  innerTuple->size);
    1349                 : 
    1350                 :         /* No parent link to update, nor redirection to do */
    1351 GIC          41 :         xlrec.offnumParent = InvalidOffsetNumber;
    1352 CBC          41 :         xlrec.nodeI = 0;
    1353 ECB             : 
    1354                 :         /* Done modifying new current buffer, mark it dirty */
    1355 GIC          41 :         MarkBufferDirty(current->buffer);
    1356 ECB             : 
    1357                 :         /* saveCurrent doesn't represent a different buffer */
    1358 GIC          41 :         saveCurrent.buffer = InvalidBuffer;
    1359 ECB             :     }
    1360                 : 
    1361 GIC        2840 :     if (RelationNeedsWAL(index) && !state->isBuild)
    1362 ECB             :     {
    1363                 :         XLogRecPtr  recptr;
    1364                 :         int         flags;
    1365                 : 
    1366 GIC         671 :         XLogBeginInsert();
    1367 ECB             : 
    1368 GIC         671 :         xlrec.nInsert = nToInsert;
    1369 CBC         671 :         XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
    1370 ECB             : 
    1371 GIC         671 :         XLogRegisterData((char *) toDelete,
    1372 CBC         671 :                          sizeof(OffsetNumber) * xlrec.nDelete);
    1373             671 :         XLogRegisterData((char *) toInsert,
    1374             671 :                          sizeof(OffsetNumber) * xlrec.nInsert);
    1375             671 :         XLogRegisterData((char *) leafPageSelect,
    1376             671 :                          sizeof(uint8) * xlrec.nInsert);
    1377             671 :         XLogRegisterData((char *) innerTuple, innerTuple->size);
    1378             671 :         XLogRegisterData(leafdata, leafptr - leafdata);
    1379 ECB             : 
    1380                 :         /* Old leaf page */
    1381 GIC         671 :         if (BufferIsValid(saveCurrent.buffer))
    1382 ECB             :         {
    1383 GIC         661 :             flags = REGBUF_STANDARD;
    1384 CBC         661 :             if (xlrec.initSrc)
    1385              22 :                 flags |= REGBUF_WILL_INIT;
    1386             661 :             XLogRegisterBuffer(0, saveCurrent.buffer, flags);
    1387 ECB             :         }
    1388                 : 
    1389                 :         /* New leaf page */
    1390 GIC         671 :         if (BufferIsValid(newLeafBuffer))
    1391 ECB             :         {
    1392 GIC         649 :             flags = REGBUF_STANDARD;
    1393 CBC         649 :             if (xlrec.initDest)
    1394             604 :                 flags |= REGBUF_WILL_INIT;
    1395             649 :             XLogRegisterBuffer(1, newLeafBuffer, flags);
    1396 ECB             :         }
    1397                 : 
    1398                 :         /* Inner page */
    1399 GIC         671 :         flags = REGBUF_STANDARD;
    1400 CBC         671 :         if (xlrec.initInner)
    1401              20 :             flags |= REGBUF_WILL_INIT;
    1402             671 :         XLogRegisterBuffer(2, current->buffer, flags);
    1403 ECB             : 
    1404                 :         /* Parent page, if different from inner page */
    1405 GIC         671 :         if (parent->buffer != InvalidBuffer)
    1406 ECB             :         {
    1407 GIC         661 :             if (parent->buffer != current->buffer)
    1408 CBC          62 :                 XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
    1409 ECB             :             else
    1410 GIC         599 :                 Assert(xlrec.innerIsParent);
    1411 ECB             :         }
    1412                 : 
    1413                 :         /* Issue the WAL record */
    1414 GIC         671 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
    1415 ECB             : 
    1416                 :         /* Update page LSNs on all affected pages */
    1417 GIC         671 :         if (newLeafBuffer != InvalidBuffer)
    1418 ECB             :         {
    1419 GIC         649 :             Page        page = BufferGetPage(newLeafBuffer);
    1420 ECB             : 
    1421 GIC         649 :             PageSetLSN(page, recptr);
    1422 ECB             :         }
    1423                 : 
    1424 GIC         671 :         if (saveCurrent.buffer != InvalidBuffer)
    1425 ECB             :         {
    1426 GIC         661 :             Page        page = BufferGetPage(saveCurrent.buffer);
    1427 ECB             : 
    1428 GIC         661 :             PageSetLSN(page, recptr);
    1429 ECB             :         }
    1430                 : 
    1431 GIC         671 :         PageSetLSN(current->page, recptr);
    1432 ECB             : 
    1433 GIC         671 :         if (parent->buffer != InvalidBuffer)
    1434 ECB             :         {
    1435 GIC         661 :             PageSetLSN(parent->page, recptr);
    1436 ECB             :         }
    1437                 :     }
    1438                 : 
    1439 GIC        2840 :     END_CRIT_SECTION();
    1440 ECB             : 
    1441                 :     /* Update local free-space cache and unlock buffers */
    1442 GIC        2840 :     if (newLeafBuffer != InvalidBuffer)
    1443 ECB             :     {
    1444 GIC        2806 :         SpGistSetLastUsedPage(index, newLeafBuffer);
    1445 CBC        2806 :         UnlockReleaseBuffer(newLeafBuffer);
    1446 ECB             :     }
    1447 GIC        2840 :     if (saveCurrent.buffer != InvalidBuffer)
    1448 ECB             :     {
    1449 GIC        2799 :         SpGistSetLastUsedPage(index, saveCurrent.buffer);
    1450 CBC        2799 :         UnlockReleaseBuffer(saveCurrent.buffer);
    1451 ECB             :     }
    1452                 : 
    1453 GIC        2840 :     return insertedNew;
    1454 ECB             : }
    1455                 : 
    1456                 : /*
    1457                 :  * spgMatchNode action: descend to N'th child node of current inner tuple
    1458                 :  */
    1459                 : static void
    1460 GIC     9331580 : spgMatchNodeAction(Relation index, SpGistState *state,
    1461 ECB             :                    SpGistInnerTuple innerTuple,
    1462                 :                    SPPageDesc *current, SPPageDesc *parent, int nodeN)
    1463                 : {
    1464                 :     int         i;
    1465                 :     SpGistNodeTuple node;
    1466                 : 
    1467                 :     /* Release previous parent buffer if any */
    1468 GIC     9331580 :     if (parent->buffer != InvalidBuffer &&
    1469 CBC     8936882 :         parent->buffer != current->buffer)
    1470 ECB             :     {
    1471 GIC      392309 :         SpGistSetLastUsedPage(index, parent->buffer);
    1472 CBC      392309 :         UnlockReleaseBuffer(parent->buffer);
    1473 ECB             :     }
    1474                 : 
    1475                 :     /* Repoint parent to specified node of current inner tuple */
    1476 GIC     9331580 :     parent->blkno = current->blkno;
    1477 CBC     9331580 :     parent->buffer = current->buffer;
    1478         9331580 :     parent->page = current->page;
    1479         9331580 :     parent->offnum = current->offnum;
    1480         9331580 :     parent->node = nodeN;
    1481 ECB             : 
    1482                 :     /* Locate that node */
    1483 GIC    15631807 :     SGITITERATE(innerTuple, i, node)
    1484 ECB             :     {
    1485 GIC    15631807 :         if (i == nodeN)
    1486 CBC     9331580 :             break;
    1487 ECB             :     }
    1488                 : 
    1489 GIC     9331580 :     if (i != nodeN)
    1490 LBC           0 :         elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
    1491 EUB             :              nodeN);
    1492                 : 
    1493                 :     /* Point current to the downlink location, if any */
    1494 GIC     9331580 :     if (ItemPointerIsValid(&node->t_tid))
    1495 ECB             :     {
    1496 GIC     9330543 :         current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
    1497 CBC     9330543 :         current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
    1498 ECB             :     }
    1499                 :     else
    1500                 :     {
    1501                 :         /* Downlink is empty, so we'll need to find a new page */
    1502 GIC        1037 :         current->blkno = InvalidBlockNumber;
    1503 CBC        1037 :         current->offnum = InvalidOffsetNumber;
    1504 ECB             :     }
    1505                 : 
    1506 GIC     9331580 :     current->buffer = InvalidBuffer;
    1507 CBC     9331580 :     current->page = NULL;
    1508         9331580 : }
    1509 ECB             : 
    1510                 : /*
    1511                 :  * spgAddNode action: add a node to the inner tuple at current
    1512                 :  */
    1513                 : static void
    1514 GIC         783 : spgAddNodeAction(Relation index, SpGistState *state,
    1515 ECB             :                  SpGistInnerTuple innerTuple,
    1516                 :                  SPPageDesc *current, SPPageDesc *parent,
    1517                 :                  int nodeN, Datum nodeLabel)
    1518                 : {
    1519                 :     SpGistInnerTuple newInnerTuple;
    1520                 :     spgxlogAddNode xlrec;
    1521                 : 
    1522                 :     /* Should not be applied to nulls */
    1523 GIC         783 :     Assert(!SpGistPageStoresNulls(current->page));
    1524 ECB             : 
    1525                 :     /* Construct new inner tuple with additional node */
    1526 GIC         783 :     newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
    1527 ECB             : 
    1528                 :     /* Prepare WAL record */
    1529 GIC         783 :     STORE_STATE(state, xlrec.stateSrc);
    1530 CBC         783 :     xlrec.offnum = current->offnum;
    1531 ECB             : 
    1532                 :     /* we don't fill these unless we need to change the parent downlink */
    1533 GIC         783 :     xlrec.parentBlk = -1;
    1534 CBC         783 :     xlrec.offnumParent = InvalidOffsetNumber;
    1535             783 :     xlrec.nodeI = 0;
    1536 ECB             : 
    1537                 :     /* we don't fill these unless tuple has to be moved */
    1538 GIC         783 :     xlrec.offnumNew = InvalidOffsetNumber;
    1539 CBC         783 :     xlrec.newPage = false;
    1540 ECB             : 
    1541 GIC         783 :     if (PageGetExactFreeSpace(current->page) >=
    1542 CBC         783 :         newInnerTuple->size - innerTuple->size)
    1543 ECB             :     {
    1544                 :         /*
    1545                 :          * We can replace the inner tuple by new version in-place
    1546                 :          */
    1547 GIC         780 :         START_CRIT_SECTION();
    1548 ECB             : 
    1549 GIC         780 :         PageIndexTupleDelete(current->page, current->offnum);
    1550 CBC         780 :         if (PageAddItem(current->page,
    1551 ECB             :                         (Item) newInnerTuple, newInnerTuple->size,
    1552 GIC         780 :                         current->offnum, false, false) != current->offnum)
    1553 LBC           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1554 EUB             :                  newInnerTuple->size);
    1555                 : 
    1556 GIC         780 :         MarkBufferDirty(current->buffer);
    1557 ECB             : 
    1558 GIC         780 :         if (RelationNeedsWAL(index) && !state->isBuild)
    1559 ECB             :         {
    1560                 :             XLogRecPtr  recptr;
    1561                 : 
    1562 GIC         355 :             XLogBeginInsert();
    1563 CBC         355 :             XLogRegisterData((char *) &xlrec, sizeof(xlrec));
    1564             355 :             XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
    1565 ECB             : 
    1566 GIC         355 :             XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
    1567 ECB             : 
    1568 GIC         355 :             recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
    1569 ECB             : 
    1570 GIC         355 :             PageSetLSN(current->page, recptr);
    1571 ECB             :         }
    1572                 : 
    1573 GIC         780 :         END_CRIT_SECTION();
    1574 ECB             :     }
    1575                 :     else
    1576                 :     {
    1577                 :         /*
    1578                 :          * move inner tuple to another page, and update parent
    1579                 :          */
    1580                 :         SpGistDeadTuple dt;
    1581                 :         SPPageDesc  saveCurrent;
    1582                 : 
    1583                 :         /*
    1584                 :          * It should not be possible to get here for the root page, since we
    1585                 :          * allow only one inner tuple on the root page, and spgFormInnerTuple
    1586                 :          * always checks that inner tuples don't exceed the size of a page.
    1587                 :          */
    1588 GIC           3 :         if (SpGistBlockIsRoot(current->blkno))
    1589 LBC           0 :             elog(ERROR, "cannot enlarge root tuple any more");
    1590 GBC           3 :         Assert(parent->buffer != InvalidBuffer);
    1591 ECB             : 
    1592 GIC           3 :         saveCurrent = *current;
    1593 ECB             : 
    1594 GIC           3 :         xlrec.offnumParent = parent->offnum;
    1595 CBC           3 :         xlrec.nodeI = parent->node;
    1596 ECB             : 
    1597                 :         /*
    1598                 :          * obtain new buffer with the same parity as current, since it will be
    1599                 :          * a child of same parent tuple
    1600                 :          */
    1601 GIC           6 :         current->buffer = SpGistGetBuffer(index,
    1602 CBC           3 :                                           GBUF_INNER_PARITY(current->blkno),
    1603               3 :                                           newInnerTuple->size + sizeof(ItemIdData),
    1604 ECB             :                                           &xlrec.newPage);
    1605 GIC           3 :         current->blkno = BufferGetBlockNumber(current->buffer);
    1606 CBC           3 :         current->page = BufferGetPage(current->buffer);
    1607 ECB             : 
    1608                 :         /*
    1609                 :          * Let's just make real sure new current isn't same as old.  Right now
    1610                 :          * that's impossible, but if SpGistGetBuffer ever got smart enough to
    1611                 :          * delete placeholder tuples before checking space, maybe it wouldn't
    1612                 :          * be impossible.  The case would appear to work except that WAL
    1613                 :          * replay would be subtly wrong, so I think a mere assert isn't enough
    1614                 :          * here.
    1615                 :          */
    1616 GIC           3 :         if (current->blkno == saveCurrent.blkno)
    1617 LBC           0 :             elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
    1618 EUB             : 
    1619                 :         /*
    1620                 :          * New current and parent buffer will both be modified; but note that
    1621                 :          * parent buffer could be same as either new or old current.
    1622                 :          */
    1623 GIC           3 :         if (parent->buffer == saveCurrent.buffer)
    1624 LBC           0 :             xlrec.parentBlk = 0;
    1625 GBC           3 :         else if (parent->buffer == current->buffer)
    1626 LBC           0 :             xlrec.parentBlk = 1;
    1627 EUB             :         else
    1628 GIC           3 :             xlrec.parentBlk = 2;
    1629 ECB             : 
    1630 GIC           3 :         START_CRIT_SECTION();
    1631 ECB             : 
    1632                 :         /* insert new ... */
    1633 GIC           3 :         xlrec.offnumNew = current->offnum =
    1634 CBC           3 :             SpGistPageAddNewItem(state, current->page,
    1635               3 :                                  (Item) newInnerTuple, newInnerTuple->size,
    1636 ECB             :                                  NULL, false);
    1637                 : 
    1638 GIC           3 :         MarkBufferDirty(current->buffer);
    1639 ECB             : 
    1640                 :         /* update parent's downlink and mark parent page dirty */
    1641 GIC           3 :         saveNodeLink(index, parent, current->blkno, current->offnum);
    1642 ECB             : 
    1643                 :         /*
    1644                 :          * Replace old tuple with a placeholder or redirection tuple.  Unless
    1645                 :          * doing an index build, we have to insert a redirection tuple for
    1646                 :          * possible concurrent scans.  We can't just delete it in any case,
    1647                 :          * because that could change the offsets of other tuples on the page,
    1648                 :          * breaking downlinks from their parents.
    1649                 :          */
    1650 GIC           3 :         if (state->isBuild)
    1651 LBC           0 :             dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
    1652 EUB             :                                   InvalidBlockNumber, InvalidOffsetNumber);
    1653                 :         else
    1654 GIC           3 :             dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
    1655 CBC           3 :                                   current->blkno, current->offnum);
    1656 ECB             : 
    1657 GIC           3 :         PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
    1658 CBC           3 :         if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
    1659 ECB             :                         saveCurrent.offnum,
    1660 GIC           3 :                         false, false) != saveCurrent.offnum)
    1661 LBC           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1662 EUB             :                  dt->size);
    1663                 : 
    1664 GIC           3 :         if (state->isBuild)
    1665 LBC           0 :             SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
    1666 EUB             :         else
    1667 GIC           3 :             SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
    1668 ECB             : 
    1669 GIC           3 :         MarkBufferDirty(saveCurrent.buffer);
    1670 ECB             : 
    1671 GIC           3 :         if (RelationNeedsWAL(index) && !state->isBuild)
    1672 ECB             :         {
    1673                 :             XLogRecPtr  recptr;
    1674                 :             int         flags;
    1675                 : 
    1676 GIC           3 :             XLogBeginInsert();
    1677 ECB             : 
    1678                 :             /* orig page */
    1679 GIC           3 :             XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
    1680 ECB             :             /* new page */
    1681 GIC           3 :             flags = REGBUF_STANDARD;
    1682 CBC           3 :             if (xlrec.newPage)
    1683               3 :                 flags |= REGBUF_WILL_INIT;
    1684               3 :             XLogRegisterBuffer(1, current->buffer, flags);
    1685 ECB             :             /* parent page (if different from orig and new) */
    1686 GIC           3 :             if (xlrec.parentBlk == 2)
    1687 CBC           3 :                 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
    1688 ECB             : 
    1689 GIC           3 :             XLogRegisterData((char *) &xlrec, sizeof(xlrec));
    1690 CBC           3 :             XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
    1691 ECB             : 
    1692 GIC           3 :             recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
    1693 ECB             : 
    1694                 :             /* we don't bother to check if any of these are redundant */
    1695 GIC           3 :             PageSetLSN(current->page, recptr);
    1696 CBC           3 :             PageSetLSN(parent->page, recptr);
    1697               3 :             PageSetLSN(saveCurrent.page, recptr);
    1698 ECB             :         }
    1699                 : 
    1700 GIC           3 :         END_CRIT_SECTION();
    1701 ECB             : 
    1702                 :         /* Release saveCurrent if it's not same as current or parent */
    1703 GIC           3 :         if (saveCurrent.buffer != current->buffer &&
    1704 CBC           3 :             saveCurrent.buffer != parent->buffer)
    1705 ECB             :         {
    1706 GIC           3 :             SpGistSetLastUsedPage(index, saveCurrent.buffer);
    1707 CBC           3 :             UnlockReleaseBuffer(saveCurrent.buffer);
    1708 ECB             :         }
    1709                 :     }
    1710 GIC         783 : }
    1711 ECB             : 
    1712                 : /*
    1713                 :  * spgSplitNode action: split inner tuple at current into prefix and postfix
    1714                 :  */
    1715                 : static void
    1716 GIC         321 : spgSplitNodeAction(Relation index, SpGistState *state,
    1717 ECB             :                    SpGistInnerTuple innerTuple,
    1718                 :                    SPPageDesc *current, spgChooseOut *out)
    1719                 : {
    1720                 :     SpGistInnerTuple prefixTuple,
    1721                 :                 postfixTuple;
    1722                 :     SpGistNodeTuple node,
    1723                 :                *nodes;
    1724                 :     BlockNumber postfixBlkno;
    1725                 :     OffsetNumber postfixOffset;
    1726                 :     int         i;
    1727                 :     spgxlogSplitTuple xlrec;
    1728 GIC         321 :     Buffer      newBuffer = InvalidBuffer;
    1729 ECB             : 
    1730                 :     /* Should not be applied to nulls */
    1731 GIC         321 :     Assert(!SpGistPageStoresNulls(current->page));
    1732 ECB             : 
    1733                 :     /* Check opclass gave us sane values */
    1734 GIC         321 :     if (out->result.splitTuple.prefixNNodes <= 0 ||
    1735 CBC         321 :         out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
    1736 LBC           0 :         elog(ERROR, "invalid number of prefix nodes: %d",
    1737 EUB             :              out->result.splitTuple.prefixNNodes);
    1738 GIC         321 :     if (out->result.splitTuple.childNodeN < 0 ||
    1739 CBC         321 :         out->result.splitTuple.childNodeN >=
    1740             321 :         out->result.splitTuple.prefixNNodes)
    1741 LBC           0 :         elog(ERROR, "invalid child node number: %d",
    1742 EUB             :              out->result.splitTuple.childNodeN);
    1743                 : 
    1744                 :     /*
    1745                 :      * Construct new prefix tuple with requested number of nodes.  We'll fill
    1746                 :      * in the childNodeN'th node's downlink below.
    1747                 :      */
    1748 GIC         321 :     nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
    1749 CBC         321 :                                        out->result.splitTuple.prefixNNodes);
    1750 ECB             : 
    1751 GIC         642 :     for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
    1752 ECB             :     {
    1753 GIC         321 :         Datum       label = (Datum) 0;
    1754 ECB             :         bool        labelisnull;
    1755                 : 
    1756 GIC         321 :         labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
    1757 CBC         321 :         if (!labelisnull)
    1758             321 :             label = out->result.splitTuple.prefixNodeLabels[i];
    1759             321 :         nodes[i] = spgFormNodeTuple(state, label, labelisnull);
    1760 ECB             :     }
    1761                 : 
    1762 GIC         321 :     prefixTuple = spgFormInnerTuple(state,
    1763 CBC         321 :                                     out->result.splitTuple.prefixHasPrefix,
    1764 ECB             :                                     out->result.splitTuple.prefixPrefixDatum,
    1765                 :                                     out->result.splitTuple.prefixNNodes,
    1766                 :                                     nodes);
    1767                 : 
    1768                 :     /* it must fit in the space that innerTuple now occupies */
    1769 GIC         321 :     if (prefixTuple->size > innerTuple->size)
    1770 LBC           0 :         elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
    1771 EUB             : 
    1772                 :     /*
    1773                 :      * Construct new postfix tuple, containing all nodes of innerTuple with
    1774                 :      * same node datums, but with the prefix specified by the picksplit
    1775                 :      * function.
    1776                 :      */
    1777 GIC         321 :     nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
    1778 CBC        1121 :     SGITITERATE(innerTuple, i, node)
    1779 ECB             :     {
    1780 GIC         800 :         nodes[i] = node;
    1781 ECB             :     }
    1782                 : 
    1783 GIC         321 :     postfixTuple = spgFormInnerTuple(state,
    1784 CBC         321 :                                      out->result.splitTuple.postfixHasPrefix,
    1785 ECB             :                                      out->result.splitTuple.postfixPrefixDatum,
    1786 GIC         321 :                                      innerTuple->nNodes, nodes);
    1787 ECB             : 
    1788                 :     /* Postfix tuple is allTheSame if original tuple was */
    1789 GIC         321 :     postfixTuple->allTheSame = innerTuple->allTheSame;
    1790 ECB             : 
    1791                 :     /* prep data for WAL record */
    1792 GIC         321 :     xlrec.newPage = false;
    1793 ECB             : 
    1794                 :     /*
    1795                 :      * If we can't fit both tuples on the current page, get a new page for the
    1796                 :      * postfix tuple.  In particular, can't split to the root page.
    1797                 :      *
    1798                 :      * For the space calculation, note that prefixTuple replaces innerTuple
    1799                 :      * but postfixTuple will be a new entry.
    1800                 :      */
    1801 GIC         321 :     if (SpGistBlockIsRoot(current->blkno) ||
    1802 CBC         315 :         SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
    1803             315 :         prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
    1804 ECB             :     {
    1805                 :         /*
    1806                 :          * Choose page with next triple parity, because postfix tuple is a
    1807                 :          * child of prefix one
    1808                 :          */
    1809 GIC          84 :         newBuffer = SpGistGetBuffer(index,
    1810 CBC          84 :                                     GBUF_INNER_PARITY(current->blkno + 1),
    1811              84 :                                     postfixTuple->size + sizeof(ItemIdData),
    1812 ECB             :                                     &xlrec.newPage);
    1813                 :     }
    1814                 : 
    1815 GIC         321 :     START_CRIT_SECTION();
    1816 ECB             : 
    1817                 :     /*
    1818                 :      * Replace old tuple by prefix tuple
    1819                 :      */
    1820 GIC         321 :     PageIndexTupleDelete(current->page, current->offnum);
    1821 CBC         321 :     xlrec.offnumPrefix = PageAddItem(current->page,
    1822 ECB             :                                      (Item) prefixTuple, prefixTuple->size,
    1823                 :                                      current->offnum, false, false);
    1824 GIC         321 :     if (xlrec.offnumPrefix != current->offnum)
    1825 LBC           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
    1826 EUB             :              prefixTuple->size);
    1827                 : 
    1828                 :     /*
    1829                 :      * put postfix tuple into appropriate page
    1830                 :      */
    1831 GIC         321 :     if (newBuffer == InvalidBuffer)
    1832 ECB             :     {
    1833 GIC         237 :         postfixBlkno = current->blkno;
    1834 CBC         237 :         xlrec.offnumPostfix = postfixOffset =
    1835             237 :             SpGistPageAddNewItem(state, current->page,
    1836             237 :                                  (Item) postfixTuple, postfixTuple->size,
    1837 ECB             :                                  NULL, false);
    1838 GIC         237 :         xlrec.postfixBlkSame = true;
    1839 ECB             :     }
    1840                 :     else
    1841                 :     {
    1842 GIC          84 :         postfixBlkno = BufferGetBlockNumber(newBuffer);
    1843 CBC          84 :         xlrec.offnumPostfix = postfixOffset =
    1844              84 :             SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
    1845              84 :                                  (Item) postfixTuple, postfixTuple->size,
    1846 ECB             :                                  NULL, false);
    1847 GIC          84 :         MarkBufferDirty(newBuffer);
    1848 CBC          84 :         xlrec.postfixBlkSame = false;
    1849 ECB             :     }
    1850                 : 
    1851                 :     /*
    1852                 :      * And set downlink pointer in the prefix tuple to point to postfix tuple.
    1853                 :      * (We can't avoid this step by doing the above two steps in opposite
    1854                 :      * order, because there might not be enough space on the page to insert
    1855                 :      * the postfix tuple first.)  We have to update the local copy of the
    1856                 :      * prefixTuple too, because that's what will be written to WAL.
    1857                 :      */
    1858 GIC         321 :     spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
    1859 ECB             :                       postfixBlkno, postfixOffset);
    1860 GIC         321 :     prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
    1861 CBC         321 :                                                  PageGetItemId(current->page, current->offnum));
    1862             321 :     spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
    1863 ECB             :                       postfixBlkno, postfixOffset);
    1864                 : 
    1865 GIC         321 :     MarkBufferDirty(current->buffer);
    1866 ECB             : 
    1867 GIC         321 :     if (RelationNeedsWAL(index) && !state->isBuild)
    1868 ECB             :     {
    1869                 :         XLogRecPtr  recptr;
    1870                 : 
    1871 GIC         306 :         XLogBeginInsert();
    1872 CBC         306 :         XLogRegisterData((char *) &xlrec, sizeof(xlrec));
    1873             306 :         XLogRegisterData((char *) prefixTuple, prefixTuple->size);
    1874             306 :         XLogRegisterData((char *) postfixTuple, postfixTuple->size);
    1875 ECB             : 
    1876 GIC         306 :         XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
    1877 CBC         306 :         if (newBuffer != InvalidBuffer)
    1878 ECB             :         {
    1879                 :             int         flags;
    1880                 : 
    1881 GIC          81 :             flags = REGBUF_STANDARD;
    1882 CBC          81 :             if (xlrec.newPage)
    1883               3 :                 flags |= REGBUF_WILL_INIT;
    1884              81 :             XLogRegisterBuffer(1, newBuffer, flags);
    1885 ECB             :         }
    1886                 : 
    1887 GIC         306 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
    1888 ECB             : 
    1889 GIC         306 :         PageSetLSN(current->page, recptr);
    1890 ECB             : 
    1891 GIC         306 :         if (newBuffer != InvalidBuffer)
    1892 ECB             :         {
    1893 GIC          81 :             PageSetLSN(BufferGetPage(newBuffer), recptr);
    1894 ECB             :         }
    1895                 :     }
    1896                 : 
    1897 GIC         321 :     END_CRIT_SECTION();
    1898 ECB             : 
    1899                 :     /* Update local free-space cache and release buffer */
    1900 GIC         321 :     if (newBuffer != InvalidBuffer)
    1901 ECB             :     {
    1902 GIC          84 :         SpGistSetLastUsedPage(index, newBuffer);
    1903 CBC          84 :         UnlockReleaseBuffer(newBuffer);
    1904 ECB             :     }
    1905 GIC         321 : }
    1906 ECB             : 
    1907                 : /*
    1908                 :  * Insert one item into the index.
    1909                 :  *
    1910                 :  * Returns true on success, false if we failed to complete the insertion
    1911                 :  * (typically because of conflict with a concurrent insert).  In the latter
    1912                 :  * case, caller should re-call spgdoinsert() with the same args.
    1913                 :  */
    1914                 : bool
    1915 GIC      403184 : spgdoinsert(Relation index, SpGistState *state,
    1916 ECB             :             ItemPointer heapPtr, Datum *datums, bool *isnulls)
    1917                 : {
    1918 GIC      403184 :     bool        result = true;
    1919 CBC      403184 :     TupleDesc   leafDescriptor = state->leafTupDesc;
    1920          403184 :     bool        isnull = isnulls[spgKeyColumn];
    1921          403184 :     int         level = 0;
    1922 ECB             :     Datum       leafDatums[INDEX_MAX_KEYS];
    1923                 :     int         leafSize;
    1924                 :     int         bestLeafSize;
    1925 GIC      403184 :     int         numNoProgressCycles = 0;
    1926 ECB             :     SPPageDesc  current,
    1927                 :                 parent;
    1928 GIC      403184 :     FmgrInfo   *procinfo = NULL;
    1929 ECB             : 
    1930                 :     /*
    1931                 :      * Look up FmgrInfo of the user-defined choose function once, to save
    1932                 :      * cycles in the loop below.
    1933                 :      */
    1934 GIC      403184 :     if (!isnull)
    1935 CBC      403139 :         procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
    1936 ECB             : 
    1937                 :     /*
    1938                 :      * Prepare the leaf datum to insert.
    1939                 :      *
    1940                 :      * If an optional "compress" method is provided, then call it to form the
    1941                 :      * leaf key datum from the input datum.  Otherwise, store the input datum
    1942                 :      * as is.  Since we don't use index_form_tuple in this AM, we have to make
    1943                 :      * sure value to be inserted is not toasted; FormIndexDatum doesn't
    1944                 :      * guarantee that.  But we assume the "compress" method to return an
    1945                 :      * untoasted value.
    1946                 :      */
    1947 GIC      403184 :     if (!isnull)
    1948 ECB             :     {
    1949 GIC      403139 :         if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
    1950 ECB             :         {
    1951 GIC       39622 :             FmgrInfo   *compressProcinfo = NULL;
    1952 ECB             : 
    1953 GIC       39622 :             compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
    1954 CBC       39622 :             leafDatums[spgKeyColumn] =
    1955           39622 :                 FunctionCall1Coll(compressProcinfo,
    1956           39622 :                                   index->rd_indcollation[spgKeyColumn],
    1957 ECB             :                                   datums[spgKeyColumn]);
    1958                 :         }
    1959                 :         else
    1960                 :         {
    1961 GIC      363517 :             Assert(state->attLeafType.type == state->attType.type);
    1962 ECB             : 
    1963 GIC      363517 :             if (state->attType.attlen == -1)
    1964 CBC       90332 :                 leafDatums[spgKeyColumn] =
    1965           90332 :                     PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
    1966 ECB             :             else
    1967 GIC      273185 :                 leafDatums[spgKeyColumn] = datums[spgKeyColumn];
    1968 ECB             :         }
    1969                 :     }
    1970                 :     else
    1971 GIC          45 :         leafDatums[spgKeyColumn] = (Datum) 0;
    1972 ECB             : 
    1973                 :     /* Likewise, ensure that any INCLUDE values are not toasted */
    1974 GIC      449437 :     for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
    1975 ECB             :     {
    1976 GIC       46253 :         if (!isnulls[i])
    1977 ECB             :         {
    1978 GIC       42806 :             if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
    1979 CBC        6622 :                 leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
    1980 ECB             :             else
    1981 GIC       36184 :                 leafDatums[i] = datums[i];
    1982 ECB             :         }
    1983                 :         else
    1984 GIC        3447 :             leafDatums[i] = (Datum) 0;
    1985 ECB             :     }
    1986                 : 
    1987                 :     /*
    1988                 :      * Compute space needed for a leaf tuple containing the given data.
    1989                 :      */
    1990 GIC      403184 :     leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
    1991 ECB             :     /* Account for an item pointer, too */
    1992 GIC      403184 :     leafSize += sizeof(ItemIdData);
    1993 ECB             : 
    1994                 :     /*
    1995                 :      * If it isn't gonna fit, and the opclass can't reduce the datum size by
    1996                 :      * suffixing, bail out now rather than doing a lot of useless work.
    1997                 :      */
    1998 GIC      403184 :     if (leafSize > SPGIST_PAGE_CAPACITY &&
    1999 CBC           2 :         (isnull || !state->config.longValuesOK))
    2000 LBC           0 :         ereport(ERROR,
    2001 EUB             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2002                 :                  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
    2003                 :                         leafSize - sizeof(ItemIdData),
    2004                 :                         SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
    2005                 :                         RelationGetRelationName(index)),
    2006                 :                  errhint("Values larger than a buffer page cannot be indexed.")));
    2007 GIC      403184 :     bestLeafSize = leafSize;
    2008 ECB             : 
    2009                 :     /* Initialize "current" to the appropriate root page */
    2010 GIC      403184 :     current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
    2011 CBC      403184 :     current.buffer = InvalidBuffer;
    2012          403184 :     current.page = NULL;
    2013          403184 :     current.offnum = FirstOffsetNumber;
    2014          403184 :     current.node = -1;
    2015 ECB             : 
    2016                 :     /* "parent" is invalid for the moment */
    2017 GIC      403184 :     parent.blkno = InvalidBlockNumber;
    2018 CBC      403184 :     parent.buffer = InvalidBuffer;
    2019          403184 :     parent.page = NULL;
    2020          403184 :     parent.offnum = InvalidOffsetNumber;
    2021          403184 :     parent.node = -1;
    2022 ECB             : 
    2023                 :     /*
    2024                 :      * Before entering the loop, try to clear any pending interrupt condition.
    2025                 :      * If a query cancel is pending, we might as well accept it now not later;
    2026                 :      * while if a non-canceling condition is pending, servicing it here avoids
    2027                 :      * having to restart the insertion and redo all the work so far.
    2028                 :      */
    2029 GIC      403184 :     CHECK_FOR_INTERRUPTS();
    2030 ECB             : 
    2031                 :     for (;;)
    2032 GIC     9331578 :     {
    2033 CBC     9734762 :         bool        isNew = false;
    2034 ECB             : 
    2035                 :         /*
    2036                 :          * Bail out if query cancel is pending.  We must have this somewhere
    2037                 :          * in the loop since a broken opclass could produce an infinite
    2038                 :          * picksplit loop.  However, because we'll be holding buffer lock(s)
    2039                 :          * after the first iteration, ProcessInterrupts() wouldn't be able to
    2040                 :          * throw a cancel error here.  Hence, if we see that an interrupt is
    2041                 :          * pending, break out of the loop and deal with the situation below.
    2042                 :          * Set result = false because we must restart the insertion if the
    2043                 :          * interrupt isn't a query-cancel-or-die case.
    2044                 :          */
    2045 GIC     9734762 :         if (INTERRUPTS_PENDING_CONDITION())
    2046 ECB             :         {
    2047 UIC           0 :             result = false;
    2048 UBC           0 :             break;
    2049 EUB             :         }
    2050                 : 
    2051 GIC     9734762 :         if (current.blkno == InvalidBlockNumber)
    2052 ECB             :         {
    2053                 :             /*
    2054                 :              * Create a leaf page.  If leafSize is too large to fit on a page,
    2055                 :              * we won't actually use the page yet, but it simplifies the API
    2056                 :              * for doPickSplit to always have a leaf page at hand; so just
    2057                 :              * quietly limit our request to a page size.
    2058                 :              */
    2059 GIC        1035 :             current.buffer =
    2060 CBC        1035 :                 SpGistGetBuffer(index,
    2061 ECB             :                                 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
    2062 GIC        1035 :                                 Min(leafSize, SPGIST_PAGE_CAPACITY),
    2063 ECB             :                                 &isNew);
    2064 GIC        1035 :             current.blkno = BufferGetBlockNumber(current.buffer);
    2065 ECB             :         }
    2066 GIC     9733727 :         else if (parent.buffer == InvalidBuffer)
    2067 ECB             :         {
    2068                 :             /* we hold no parent-page lock, so no deadlock is possible */
    2069 GIC      403184 :             current.buffer = ReadBuffer(index, current.blkno);
    2070 CBC      403184 :             LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
    2071 ECB             :         }
    2072 GIC     9330543 :         else if (current.blkno != parent.blkno)
    2073 ECB             :         {
    2074                 :             /* descend to a new child page */
    2075 GIC      786068 :             current.buffer = ReadBuffer(index, current.blkno);
    2076 ECB             : 
    2077                 :             /*
    2078                 :              * Attempt to acquire lock on child page.  We must beware of
    2079                 :              * deadlock against another insertion process descending from that
    2080                 :              * page to our parent page (see README).  If we fail to get lock,
    2081                 :              * abandon the insertion and tell our caller to start over.
    2082                 :              *
    2083                 :              * XXX this could be improved, because failing to get lock on a
    2084                 :              * buffer is not proof of a deadlock situation; the lock might be
    2085                 :              * held by a reader, or even just background writer/checkpointer
    2086                 :              * process.  Perhaps it'd be worth retrying after sleeping a bit?
    2087                 :              */
    2088 GIC      786068 :             if (!ConditionalLockBuffer(current.buffer))
    2089 ECB             :             {
    2090 GIC         598 :                 ReleaseBuffer(current.buffer);
    2091 CBC         598 :                 UnlockReleaseBuffer(parent.buffer);
    2092             598 :                 return false;
    2093 ECB             :             }
    2094                 :         }
    2095                 :         else
    2096                 :         {
    2097                 :             /* inner tuple can be stored on the same page as parent one */
    2098 GIC     8544475 :             current.buffer = parent.buffer;
    2099 ECB             :         }
    2100 GIC     9734164 :         current.page = BufferGetPage(current.buffer);
    2101 ECB             : 
    2102                 :         /* should not arrive at a page of the wrong type */
    2103 GIC    19468283 :         if (isnull ? !SpGistPageStoresNulls(current.page) :
    2104 CBC     9734119 :             SpGistPageStoresNulls(current.page))
    2105 LBC           0 :             elog(ERROR, "SPGiST index page %u has wrong nulls flag",
    2106 EUB             :                  current.blkno);
    2107                 : 
    2108 GIC     9734164 :         if (SpGistPageIsLeaf(current.page))
    2109 ECB             :         {
    2110                 :             SpGistLeafTuple leafTuple;
    2111                 :             int         nToSplit,
    2112                 :                         sizeToSplit;
    2113                 : 
    2114 GIC      402721 :             leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
    2115 CBC      805442 :             if (leafTuple->size + sizeof(ItemIdData) <=
    2116          402721 :                 SpGistPageGetFreeSpace(current.page, 1))
    2117 ECB             :             {
    2118                 :                 /* it fits on page, so insert it and we're done */
    2119 GIC      398773 :                 addLeafTuple(index, state, leafTuple,
    2120 ECB             :                              &current, &parent, isnull, isNew);
    2121 GIC      402584 :                 break;
    2122 ECB             :             }
    2123 GIC        3948 :             else if ((sizeToSplit =
    2124 CBC        3948 :                       checkSplitConditions(index, state, &current,
    2125            1790 :                                            &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
    2126            1790 :                      nToSplit < 64 &&
    2127            1132 :                      leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
    2128 ECB             :             {
    2129                 :                 /*
    2130                 :                  * the amount of data is pretty small, so just move the whole
    2131                 :                  * chain to another leaf page rather than splitting it.
    2132                 :                  */
    2133 GIC        1108 :                 Assert(!isNew);
    2134 CBC        1108 :                 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
    2135            1108 :                 break;          /* we're done */
    2136 ECB             :             }
    2137                 :             else
    2138                 :             {
    2139                 :                 /* picksplit */
    2140 GIC        2840 :                 if (doPickSplit(index, state, &current, &parent,
    2141 ECB             :                                 leafTuple, level, isnull, isNew))
    2142 GIC        2703 :                     break;      /* doPickSplit installed new tuples */
    2143 ECB             : 
    2144                 :                 /* leaf tuple will not be inserted yet */
    2145 GIC         137 :                 pfree(leafTuple);
    2146 ECB             : 
    2147                 :                 /*
    2148                 :                  * current now describes new inner tuple, go insert into it
    2149                 :                  */
    2150 GIC         137 :                 Assert(!SpGistPageIsLeaf(current.page));
    2151 CBC         137 :                 goto process_inner_tuple;
    2152 ECB             :             }
    2153                 :         }
    2154                 :         else                    /* non-leaf page */
    2155                 :         {
    2156                 :             /*
    2157                 :              * Apply the opclass choose function to figure out how to insert
    2158                 :              * the given datum into the current inner tuple.
    2159                 :              */
    2160                 :             SpGistInnerTuple innerTuple;
    2161                 :             spgChooseIn in;
    2162                 :             spgChooseOut out;
    2163                 : 
    2164                 :             /*
    2165                 :              * spgAddNode and spgSplitTuple cases will loop back to here to
    2166                 :              * complete the insertion operation.  Just in case the choose
    2167                 :              * function is broken and produces add or split requests
    2168                 :              * repeatedly, check for query cancel (see comments above).
    2169                 :              */
    2170 GIC     9331580 :     process_inner_tuple:
    2171 CBC     9332684 :             if (INTERRUPTS_PENDING_CONDITION())
    2172 ECB             :             {
    2173 UIC           0 :                 result = false;
    2174 UBC           0 :                 break;
    2175 EUB             :             }
    2176                 : 
    2177 GIC     9332684 :             innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
    2178 CBC     9332684 :                                                         PageGetItemId(current.page, current.offnum));
    2179 ECB             : 
    2180 GIC     9332684 :             in.datum = datums[spgKeyColumn];
    2181 CBC     9332684 :             in.leafDatum = leafDatums[spgKeyColumn];
    2182         9332684 :             in.level = level;
    2183         9332684 :             in.allTheSame = innerTuple->allTheSame;
    2184         9332684 :             in.hasPrefix = (innerTuple->prefixSize > 0);
    2185         9332684 :             in.prefixDatum = SGITDATUM(innerTuple, state);
    2186         9332684 :             in.nNodes = innerTuple->nNodes;
    2187         9332684 :             in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
    2188 ECB             : 
    2189 GIC     9332684 :             memset(&out, 0, sizeof(out));
    2190 ECB             : 
    2191 GIC     9332684 :             if (!isnull)
    2192 ECB             :             {
    2193                 :                 /* use user-defined choose method */
    2194 GIC     9332684 :                 FunctionCall2Coll(procinfo,
    2195 CBC     9332684 :                                   index->rd_indcollation[0],
    2196 ECB             :                                   PointerGetDatum(&in),
    2197                 :                                   PointerGetDatum(&out));
    2198                 :             }
    2199                 :             else
    2200                 :             {
    2201                 :                 /* force "match" action (to insert to random subnode) */
    2202 UIC           0 :                 out.resultType = spgMatchNode;
    2203 EUB             :             }
    2204                 : 
    2205 GIC     9332684 :             if (innerTuple->allTheSame)
    2206 ECB             :             {
    2207                 :                 /*
    2208                 :                  * It's not allowed to do an AddNode at an allTheSame tuple.
    2209                 :                  * Opclass must say "match", in which case we choose a random
    2210                 :                  * one of the nodes to descend into, or "split".
    2211                 :                  */
    2212 GIC       71207 :                 if (out.resultType == spgAddNode)
    2213 LBC           0 :                     elog(ERROR, "cannot add a node to an allTheSame inner tuple");
    2214 GBC       71207 :                 else if (out.resultType == spgMatchNode)
    2215 CBC       71201 :                     out.result.matchNode.nodeN =
    2216           71201 :                         pg_prng_uint64_range(&pg_global_prng_state,
    2217           71201 :                                              0, innerTuple->nNodes - 1);
    2218 ECB             :             }
    2219                 : 
    2220 GIC     9332684 :             switch (out.resultType)
    2221 ECB             :             {
    2222 GIC     9331580 :                 case spgMatchNode:
    2223 ECB             :                     /* Descend to N'th child node */
    2224 GIC     9331580 :                     spgMatchNodeAction(index, state, innerTuple,
    2225 ECB             :                                        &current, &parent,
    2226                 :                                        out.result.matchNode.nodeN);
    2227                 :                     /* Adjust level as per opclass request */
    2228 GIC     9331580 :                     level += out.result.matchNode.levelAdd;
    2229 ECB             :                     /* Replace leafDatum and recompute leafSize */
    2230 GIC     9331580 :                     if (!isnull)
    2231 ECB             :                     {
    2232 GIC     9331580 :                         leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
    2233 CBC     9331580 :                         leafSize = SpGistGetLeafTupleSize(leafDescriptor,
    2234 ECB             :                                                           leafDatums, isnulls);
    2235 GIC     9331580 :                         leafSize += sizeof(ItemIdData);
    2236 ECB             :                     }
    2237                 : 
    2238                 :                     /*
    2239                 :                      * Check new tuple size; fail if it can't fit, unless the
    2240                 :                      * opclass says it can handle the situation by suffixing.
    2241                 :                      *
    2242                 :                      * However, the opclass can only shorten the leaf datum,
    2243                 :                      * which may not be enough to ever make the tuple fit,
    2244                 :                      * since INCLUDE columns might alone use more than a page.
    2245                 :                      * Depending on the opclass' behavior, that could lead to
    2246                 :                      * an infinite loop --- spgtextproc.c, for example, will
    2247                 :                      * just repeatedly generate an empty-string leaf datum
    2248                 :                      * once it runs out of data.  Actual bugs in opclasses
    2249                 :                      * might cause infinite looping, too.  To detect such a
    2250                 :                      * loop, check to see if we are making progress by
    2251                 :                      * reducing the leafSize in each pass.  This is a bit
    2252                 :                      * tricky though.  Because of alignment considerations,
    2253                 :                      * the total tuple size might not decrease on every pass.
    2254                 :                      * Also, there are edge cases where the choose method
    2255                 :                      * might seem to not make progress for a cycle or two.
    2256                 :                      * Somewhat arbitrarily, we allow up to 10 no-progress
    2257                 :                      * iterations before failing.  (This limit should be more
    2258                 :                      * than MAXALIGN, to accommodate opclasses that trim one
    2259                 :                      * byte from the leaf datum per pass.)
    2260                 :                      */
    2261 GIC     9331580 :                     if (leafSize > SPGIST_PAGE_CAPACITY)
    2262 ECB             :                     {
    2263 GIC          26 :                         bool        ok = false;
    2264 ECB             : 
    2265 GIC          26 :                         if (state->config.longValuesOK && !isnull)
    2266 ECB             :                         {
    2267 GIC          26 :                             if (leafSize < bestLeafSize)
    2268 ECB             :                             {
    2269 GIC           2 :                                 ok = true;
    2270 CBC           2 :                                 bestLeafSize = leafSize;
    2271               2 :                                 numNoProgressCycles = 0;
    2272 ECB             :                             }
    2273 GIC          24 :                             else if (++numNoProgressCycles < 10)
    2274 CBC          22 :                                 ok = true;
    2275 ECB             :                         }
    2276 GIC          26 :                         if (!ok)
    2277 CBC           2 :                             ereport(ERROR,
    2278 ECB             :                                     (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2279                 :                                      errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
    2280                 :                                             leafSize - sizeof(ItemIdData),
    2281                 :                                             SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
    2282                 :                                             RelationGetRelationName(index)),
    2283                 :                                      errhint("Values larger than a buffer page cannot be indexed.")));
    2284                 :                     }
    2285                 : 
    2286                 :                     /*
    2287                 :                      * Loop around and attempt to insert the new leafDatum at
    2288                 :                      * "current" (which might reference an existing child
    2289                 :                      * tuple, or might be invalid to force us to find a new
    2290                 :                      * page for the tuple).
    2291                 :                      */
    2292 GIC     9331578 :                     break;
    2293 CBC         783 :                 case spgAddNode:
    2294 ECB             :                     /* AddNode is not sensible if nodes don't have labels */
    2295 GIC         783 :                     if (in.nodeLabels == NULL)
    2296 LBC           0 :                         elog(ERROR, "cannot add a node to an inner tuple without node labels");
    2297 EUB             :                     /* Add node to inner tuple, per request */
    2298 GIC         783 :                     spgAddNodeAction(index, state, innerTuple,
    2299 ECB             :                                      &current, &parent,
    2300                 :                                      out.result.addNode.nodeN,
    2301                 :                                      out.result.addNode.nodeLabel);
    2302                 : 
    2303                 :                     /*
    2304                 :                      * Retry insertion into the enlarged node.  We assume that
    2305                 :                      * we'll get a MatchNode result this time.
    2306                 :                      */
    2307 GIC         783 :                     goto process_inner_tuple;
    2308 ECB             :                     break;
    2309 GIC         321 :                 case spgSplitTuple:
    2310 ECB             :                     /* Split inner tuple, per request */
    2311 GIC         321 :                     spgSplitNodeAction(index, state, innerTuple,
    2312 ECB             :                                        &current, &out);
    2313                 : 
    2314                 :                     /* Retry insertion into the split node */
    2315 GIC         321 :                     goto process_inner_tuple;
    2316 ECB             :                     break;
    2317 UIC           0 :                 default:
    2318 UBC           0 :                     elog(ERROR, "unrecognized SPGiST choose result: %d",
    2319 EUB             :                          (int) out.resultType);
    2320                 :                     break;
    2321                 :             }
    2322                 :         }
    2323                 :     }                           /* end loop */
    2324                 : 
    2325                 :     /*
    2326                 :      * Release any buffers we're still holding.  Beware of possibility that
    2327                 :      * current and parent reference same buffer.
    2328                 :      */
    2329 GIC      402584 :     if (current.buffer != InvalidBuffer)
    2330 ECB             :     {
    2331 GIC      402584 :         SpGistSetLastUsedPage(index, current.buffer);
    2332 CBC      402584 :         UnlockReleaseBuffer(current.buffer);
    2333 ECB             :     }
    2334 GIC      402584 :     if (parent.buffer != InvalidBuffer &&
    2335 CBC      394098 :         parent.buffer != current.buffer)
    2336 ECB             :     {
    2337 GIC      391781 :         SpGistSetLastUsedPage(index, parent.buffer);
    2338 CBC      391781 :         UnlockReleaseBuffer(parent.buffer);
    2339 ECB             :     }
    2340                 : 
    2341                 :     /*
    2342                 :      * We do not support being called while some outer function is holding a
    2343                 :      * buffer lock (or any other reason to postpone query cancels).  If that
    2344                 :      * were the case, telling the caller to retry would create an infinite
    2345                 :      * loop.
    2346                 :      */
    2347 GIC      402584 :     Assert(INTERRUPTS_CAN_BE_PROCESSED());
    2348 ECB             : 
    2349                 :     /*
    2350                 :      * Finally, check for interrupts again.  If there was a query cancel,
    2351                 :      * ProcessInterrupts() will be able to throw the error here.  If it was
    2352                 :      * some other kind of interrupt that can just be cleared, return false to
    2353                 :      * tell our caller to retry.
    2354                 :      */
    2355 GIC      402584 :     CHECK_FOR_INTERRUPTS();
    2356 ECB             : 
    2357 GIC      402584 :     return result;
    2358 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a