LCOV - differential code coverage report
Current view: top level - src/backend/access/heap - hio.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 88.0 % 233 205 7 7 13 1 1 62 101 41 20 108 6 49
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 5 5 2 1 2 2 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * hio.c
       4                 :  *    POSTGRES heap access method input/output code.
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/access/heap/hio.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : 
      16                 : #include "postgres.h"
      17                 : 
      18                 : #include "access/heapam.h"
      19                 : #include "access/hio.h"
      20                 : #include "access/htup_details.h"
      21                 : #include "access/visibilitymap.h"
      22                 : #include "storage/bufmgr.h"
      23                 : #include "storage/freespace.h"
      24                 : #include "storage/lmgr.h"
      25                 : #include "storage/smgr.h"
      26                 : 
      27                 : 
      28                 : /*
      29                 :  * RelationPutHeapTuple - place tuple at specified page
      30                 :  *
      31                 :  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
      32                 :  *
      33                 :  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
      34                 :  */
      35                 : void
      36 CBC    15256234 : RelationPutHeapTuple(Relation relation,
      37                 :                      Buffer buffer,
      38                 :                      HeapTuple tuple,
      39                 :                      bool token)
      40                 : {
      41                 :     Page        pageHeader;
      42                 :     OffsetNumber offnum;
      43                 : 
      44                 :     /*
      45                 :      * A tuple that's being inserted speculatively should already have its
      46                 :      * token set.
      47                 :      */
      48        15256234 :     Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
      49                 : 
      50                 :     /*
      51                 :      * Do not allow tuples with invalid combinations of hint bits to be placed
      52                 :      * on a page.  This combination is detected as corruption by the
      53                 :      * contrib/amcheck logic, so if you disable this assertion, make
      54                 :      * corresponding changes there.
      55                 :      */
      56        15256234 :     Assert(!((tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED) &&
      57                 :              (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)));
      58                 : 
      59                 :     /* Add the tuple to the page */
      60        15256234 :     pageHeader = BufferGetPage(buffer);
      61                 : 
      62        15256234 :     offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
      63                 :                          tuple->t_len, InvalidOffsetNumber, false, true);
      64                 : 
      65        15256234 :     if (offnum == InvalidOffsetNumber)
      66 UBC           0 :         elog(PANIC, "failed to add tuple to page");
      67                 : 
      68                 :     /* Update tuple->t_self to the actual position where it was stored */
      69 CBC    15256234 :     ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
      70                 : 
      71                 :     /*
      72                 :      * Insert the correct position into CTID of the stored tuple, too (unless
      73                 :      * this is a speculative insertion, in which case the token is held in
      74                 :      * CTID field instead)
      75                 :      */
      76        15256234 :     if (!token)
      77                 :     {
      78        15254221 :         ItemId      itemId = PageGetItemId(pageHeader, offnum);
      79        15254221 :         HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
      80                 : 
      81        15254221 :         item->t_ctid = tuple->t_self;
      82                 :     }
      83        15256234 : }
      84                 : 
      85                 : /*
      86                 :  * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
      87                 :  */
      88                 : static Buffer
      89        12680066 : ReadBufferBI(Relation relation, BlockNumber targetBlock,
      90                 :              ReadBufferMode mode, BulkInsertState bistate)
      91                 : {
      92                 :     Buffer      buffer;
      93                 : 
      94                 :     /* If not bulk-insert, exactly like ReadBuffer */
      95        12680066 :     if (!bistate)
      96        11559672 :         return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
      97                 :                                   mode, NULL);
      98                 : 
      99                 :     /* If we have the desired block already pinned, re-pin and return it */
     100         1120394 :     if (bistate->current_buf != InvalidBuffer)
     101                 :     {
     102         1120232 :         if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
     103                 :         {
     104                 :             /*
     105                 :              * Currently the LOCK variants are only used for extending
     106                 :              * relation, which should never reach this branch.
     107                 :              */
     108         1112439 :             Assert(mode != RBM_ZERO_AND_LOCK &&
     109                 :                    mode != RBM_ZERO_AND_CLEANUP_LOCK);
     110                 : 
     111         1112439 :             IncrBufferRefCount(bistate->current_buf);
     112         1112439 :             return bistate->current_buf;
     113                 :         }
     114                 :         /* ... else drop the old buffer */
     115            7793 :         ReleaseBuffer(bistate->current_buf);
     116            7793 :         bistate->current_buf = InvalidBuffer;
     117                 :     }
     118                 : 
     119                 :     /* Perform a read using the buffer strategy */
     120            7955 :     buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
     121                 :                                 mode, bistate->strategy);
     122                 : 
     123                 :     /* Save the selected block as target for future inserts */
     124            7955 :     IncrBufferRefCount(buffer);
     125            7955 :     bistate->current_buf = buffer;
     126                 : 
     127            7955 :     return buffer;
     128                 : }
     129                 : 
     130                 : /*
     131                 :  * For each heap page which is all-visible, acquire a pin on the appropriate
     132                 :  * visibility map page, if we haven't already got one.
     133                 :  *
     134                 :  * To avoid complexity in the callers, either buffer1 or buffer2 may be
     135                 :  * InvalidBuffer if only one buffer is involved. For the same reason, block2
     136                 :  * may be smaller than block1.
     137                 :  *
     138                 :  * Returns whether buffer locks were temporarily released.
     139                 :  */
     140                 : static bool
     141 GIC    12887067 : GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
     142                 :                      BlockNumber block1, BlockNumber block2,
     143 ECB             :                      Buffer *vmbuffer1, Buffer *vmbuffer2)
     144                 : {
     145                 :     bool        need_to_pin_buffer1;
     146                 :     bool        need_to_pin_buffer2;
     147 GNC    12887067 :     bool        released_locks = false;
     148                 : 
     149                 :     /*
     150                 :      * Swap buffers around to handle case of a single block/buffer, and to
     151                 :      * handle if lock ordering rules require to lock block2 first.
     152                 :      */
     153        25773775 :     if (!BufferIsValid(buffer1) ||
     154        13093350 :         (BufferIsValid(buffer2) && block1 > block2))
     155                 :     {
     156          195115 :         Buffer      tmpbuf = buffer1;
     157          195115 :         Buffer     *tmpvmbuf = vmbuffer1;
     158          195115 :         BlockNumber tmpblock = block1;
     159                 : 
     160          195115 :         buffer1 = buffer2;
     161          195115 :         vmbuffer1 = vmbuffer2;
     162          195115 :         block1 = block2;
     163                 : 
     164          195115 :         buffer2 = tmpbuf;
     165          195115 :         vmbuffer2 = tmpvmbuf;
     166          195115 :         block2 = tmpblock;
     167                 :     }
     168                 : 
     169 GIC    12887067 :     Assert(BufferIsValid(buffer1));
     170 CBC    12887067 :     Assert(buffer2 == InvalidBuffer || block1 <= block2);
     171                 : 
     172                 :     while (1)
     173                 :     {
     174                 :         /* Figure out which pins we need but don't have. */
     175 GIC    12887067 :         need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
     176 CBC    12887067 :             && !visibilitymap_pin_ok(block1, *vmbuffer1);
     177        12887067 :         need_to_pin_buffer2 = buffer2 != InvalidBuffer
     178 GIC      206642 :             && PageIsAllVisible(BufferGetPage(buffer2))
     179 CBC    13093709 :             && !visibilitymap_pin_ok(block2, *vmbuffer2);
     180        12887067 :         if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
     181 GNC    12887067 :             break;
     182                 : 
     183 ECB             :         /* We must unlock both buffers before doing any I/O. */
     184 UNC           0 :         released_locks = true;
     185 LBC           0 :         LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
     186               0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     187 UIC           0 :             LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
     188 ECB             : 
     189                 :         /* Get pins. */
     190 LBC           0 :         if (need_to_pin_buffer1)
     191 UIC           0 :             visibilitymap_pin(relation, block1, vmbuffer1);
     192               0 :         if (need_to_pin_buffer2)
     193 LBC           0 :             visibilitymap_pin(relation, block2, vmbuffer2);
     194 ECB             : 
     195                 :         /* Relock buffers. */
     196 UIC           0 :         LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
     197               0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     198               0 :             LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
     199 ECB             : 
     200                 :         /*
     201                 :          * If there are two buffers involved and we pinned just one of them,
     202                 :          * it's possible that the second one became all-visible while we were
     203                 :          * busy pinning the first one.  If it looks like that's a possible
     204                 :          * scenario, we'll need to make a second pass through this loop.
     205                 :          */
     206 UIC           0 :         if (buffer2 == InvalidBuffer || buffer1 == buffer2
     207               0 :             || (need_to_pin_buffer1 && need_to_pin_buffer2))
     208 EUB             :             break;
     209                 :     }
     210                 : 
     211 GNC    12887067 :     return released_locks;
     212 EUB             : }
     213                 : 
     214                 : /*
     215                 :  * Extend the relation. By multiple pages, if beneficial.
     216                 :  *
     217                 :  * If the caller needs multiple pages (num_pages > 1), we always try to extend
     218                 :  * by at least that much.
     219                 :  *
     220                 :  * If there is contention on the extension lock, we don't just extend "for
     221                 :  * ourselves", but we try to help others. We can do so by adding empty pages
     222                 :  * into the FSM. Typically there is no contention when we can't use the FSM.
     223                 :  *
     224                 :  * We do have to limit the number of pages to extend by to some value, as the
     225                 :  * buffers for all the extended pages need to, temporarily, be pinned. For now
     226                 :  * we define MAX_BUFFERS_TO_EXTEND_BY to be 64 buffers, it's hard to see
     227                 :  * benefits with higher numbers. This partially is because copyfrom.c's
     228                 :  * MAX_BUFFERED_TUPLES / MAX_BUFFERED_BYTES prevents larger multi_inserts.
     229                 :  *
     230                 :  * Returns a buffer for a newly extended block. If possible, the buffer is
     231                 :  * returned exclusively locked. *did_unlock is set to true if the lock had to
     232                 :  * be released, false otherwise.
     233                 :  *
     234                 :  *
     235                 :  * XXX: It would likely be beneficial for some workloads to extend more
     236                 :  * aggressively, e.g. using a heuristic based on the relation size.
     237                 :  */
     238                 : static Buffer
     239 GNC      215347 : RelationAddBlocks(Relation relation, BulkInsertState bistate,
     240                 :                   int num_pages, bool use_fsm, bool *did_unlock)
     241 EUB             : {
     242                 : #define MAX_BUFFERS_TO_EXTEND_BY 64
     243                 :     Buffer      victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
     244 GNC      215347 :     BlockNumber first_block = InvalidBlockNumber;
     245          215347 :     BlockNumber last_block = InvalidBlockNumber;
     246                 :     uint32      extend_by_pages;
     247                 :     uint32      not_in_fsm_pages;
     248                 :     Buffer      buffer;
     249                 :     Page        page;
     250 EUB             : 
     251                 :     /*
     252                 :      * Determine by how many pages to try to extend by.
     253                 :      */
     254 GNC      215347 :     if (bistate == NULL && !use_fsm)
     255                 :     {
     256                 :         /*
     257                 :          * If we have neither bistate, nor can use the FSM, we can't bulk
     258                 :          * extend - there'd be no way to find the additional pages.
     259                 :          */
     260             167 :         extend_by_pages = 1;
     261                 :     }
     262                 :     else
     263                 :     {
     264                 :         uint32      waitcount;
     265                 : 
     266                 :         /*
     267                 :          * Try to extend at least by the number of pages the caller needs. We
     268                 :          * can remember the additional pages (either via FSM or bistate).
     269                 :          */
     270          215180 :         extend_by_pages = num_pages;
     271                 : 
     272          215180 :         if (!RELATION_IS_LOCAL(relation))
     273          114049 :             waitcount = RelationExtensionLockWaiterCount(relation);
     274                 :         else
     275          101131 :             waitcount = 0;
     276                 : 
     277                 :         /*
     278                 :          * Multiply the number of pages to extend by the number of waiters. Do
     279                 :          * this even if we're not using the FSM, as it still relieves
     280                 :          * contention, by deferring the next time this backend needs to
     281                 :          * extend. In that case the extended pages will be found via
     282                 :          * bistate->next_free.
     283                 :          */
     284          215180 :         extend_by_pages += extend_by_pages * waitcount;
     285                 : 
     286                 :         /*
     287                 :          * Can't extend by more than MAX_BUFFERS_TO_EXTEND_BY, we need to pin
     288                 :          * them all concurrently.
     289                 :          */
     290          215180 :         extend_by_pages = Min(extend_by_pages, MAX_BUFFERS_TO_EXTEND_BY);
     291 ECB             :     }
     292                 : 
     293                 :     /*
     294                 :      * How many of the extended pages should be entered into the FSM?
     295                 :      *
     296                 :      * If we have a bistate, only enter pages that we don't need ourselves
     297                 :      * into the FSM.  Otherwise every other backend will immediately try to
     298                 :      * use the pages this backend needs for itself, causing unnecessary
     299                 :      * contention.  If we don't have a bistate, we can't avoid the FSM.
     300                 :      *
     301                 :      * Never enter the page returned into the FSM, we'll immediately use it.
     302                 :      */
     303 GNC      215347 :     if (num_pages > 1 && bistate == NULL)
     304             248 :         not_in_fsm_pages = 1;
     305                 :     else
     306          215099 :         not_in_fsm_pages = num_pages;
     307                 : 
     308                 :     /* prepare to put another buffer into the bistate */
     309          215347 :     if (bistate && bistate->current_buf != InvalidBuffer)
     310                 :     {
     311           10440 :         ReleaseBuffer(bistate->current_buf);
     312           10440 :         bistate->current_buf = InvalidBuffer;
     313                 :     }
     314                 : 
     315                 :     /*
     316                 :      * Extend the relation. We ask for the first returned page to be locked,
     317                 :      * so that we are sure that nobody has inserted into the page
     318                 :      * concurrently.
     319                 :      *
     320                 :      * With the current MAX_BUFFERS_TO_EXTEND_BY there's no danger of
     321                 :      * [auto]vacuum trying to truncate later pages as REL_TRUNCATE_MINIMUM is
     322                 :      * way larger.
     323                 :      */
     324          215347 :     first_block = ExtendBufferedRelBy(EB_REL(relation), MAIN_FORKNUM,
     325                 :                                       bistate ? bistate->strategy : NULL,
     326                 :                                       EB_LOCK_FIRST,
     327                 :                                       extend_by_pages,
     328                 :                                       victim_buffers,
     329                 :                                       &extend_by_pages);
     330          215347 :     buffer = victim_buffers[0]; /* the buffer the function will return */
     331          215347 :     last_block = first_block + (extend_by_pages - 1);
     332          215347 :     Assert(first_block == BufferGetBlockNumber(buffer));
     333                 : 
     334                 :     /*
     335                 :      * Relation is now extended. Initialize the page. We do this here, before
     336                 :      * potentially releasing the lock on the page, because it allows us to
     337                 :      * double check that the page contents are empty (this should never
     338                 :      * happen, but if it does we don't want to risk wiping out valid data).
     339                 :      */
     340          215347 :     page = BufferGetPage(buffer);
     341          215347 :     if (!PageIsNew(page))
     342 UNC           0 :         elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
     343                 :              first_block,
     344                 :              RelationGetRelationName(relation));
     345                 : 
     346 GNC      215347 :     PageInit(page, BufferGetPageSize(buffer), 0);
     347          215347 :     MarkBufferDirty(buffer);
     348                 : 
     349                 :     /*
     350                 :      * If we decided to put pages into the FSM, release the buffer lock (but
     351                 :      * not pin), we don't want to do IO while holding a buffer lock. This will
     352                 :      * necessitate a bit more extensive checking in our caller.
     353                 :      */
     354          215347 :     if (use_fsm && not_in_fsm_pages < extend_by_pages)
     355                 :     {
     356             249 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     357             249 :         *did_unlock = true;
     358                 :     }
     359                 :     else
     360          215098 :         *did_unlock = false;
     361                 : 
     362                 :     /*
     363                 :      * Relation is now extended. Release pins on all buffers, except for the
     364                 :      * first (which we'll return).  If we decided to put pages into the FSM,
     365                 :      * we can do that as part of the same loop.
     366                 :      */
     367          224616 :     for (uint32 i = 1; i < extend_by_pages; i++)
     368                 :     {
     369            9269 :         BlockNumber curBlock = first_block + i;
     370                 : 
     371            9269 :         Assert(curBlock == BufferGetBlockNumber(victim_buffers[i]));
     372            9269 :         Assert(BlockNumberIsValid(curBlock));
     373                 : 
     374            9269 :         ReleaseBuffer(victim_buffers[i]);
     375                 : 
     376            9269 :         if (use_fsm && i >= not_in_fsm_pages)
     377                 :         {
     378            1504 :             Size        freespace = BufferGetPageSize(victim_buffers[i]) -
     379                 :             SizeOfPageHeaderData;
     380                 : 
     381            1504 :             RecordPageWithFreeSpace(relation, curBlock, freespace);
     382                 :         }
     383                 :     }
     384                 : 
     385          215347 :     if (use_fsm && not_in_fsm_pages < extend_by_pages)
     386                 :     {
     387             249 :         BlockNumber first_fsm_block = first_block + not_in_fsm_pages;
     388                 : 
     389             249 :         FreeSpaceMapVacuumRange(relation, first_fsm_block, last_block);
     390                 :     }
     391                 : 
     392          215347 :     if (bistate)
     393                 :     {
     394                 :         /*
     395                 :          * Remember the additional pages we extended by, so we later can use
     396                 :          * them without looking into the FSM.
     397                 :          */
     398           12073 :         if (extend_by_pages > 1)
     399                 :         {
     400             977 :             bistate->next_free = first_block + 1;
     401             977 :             bistate->last_free = last_block;
     402                 :         }
     403                 :         else
     404                 :         {
     405           11096 :             bistate->next_free = InvalidBlockNumber;
     406           11096 :             bistate->last_free = InvalidBlockNumber;
     407                 :         }
     408                 : 
     409                 :         /* maintain bistate->current_buf */
     410           12073 :         IncrBufferRefCount(buffer);
     411           12073 :         bistate->current_buf = buffer;
     412                 :     }
     413                 : 
     414          215347 :     return buffer;
     415                 : #undef MAX_BUFFERS_TO_EXTEND_BY
     416 ECB             : }
     417                 : 
     418                 : /*
     419                 :  * RelationGetBufferForTuple
     420                 :  *
     421                 :  *  Returns pinned and exclusive-locked buffer of a page in given relation
     422                 :  *  with free space >= given len.
     423                 :  *
     424                 :  *  If num_pages is > 1, we will try to extend the relation by at least that
     425                 :  *  many pages when we decide to extend the relation. This is more efficient
     426                 :  *  for callers that know they will need multiple pages
     427                 :  *  (e.g. heap_multi_insert()).
     428                 :  *
     429                 :  *  If otherBuffer is not InvalidBuffer, then it references a previously
     430                 :  *  pinned buffer of another page in the same relation; on return, this
     431                 :  *  buffer will also be exclusive-locked.  (This case is used by heap_update;
     432                 :  *  the otherBuffer contains the tuple being updated.)
     433                 :  *
     434                 :  *  The reason for passing otherBuffer is that if two backends are doing
     435                 :  *  concurrent heap_update operations, a deadlock could occur if they try
     436                 :  *  to lock the same two buffers in opposite orders.  To ensure that this
     437                 :  *  can't happen, we impose the rule that buffers of a relation must be
     438                 :  *  locked in increasing page number order.  This is most conveniently done
     439                 :  *  by having RelationGetBufferForTuple lock them both, with suitable care
     440                 :  *  for ordering.
     441                 :  *
     442                 :  *  NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
     443                 :  *  same buffer we select for insertion of the new tuple (this could only
     444                 :  *  happen if space is freed in that page after heap_update finds there's not
     445                 :  *  enough there).  In that case, the page will be pinned and locked only once.
     446                 :  *
     447                 :  *  We also handle the possibility that the all-visible flag will need to be
     448                 :  *  cleared on one or both pages.  If so, pin on the associated visibility map
     449                 :  *  page must be acquired before acquiring buffer lock(s), to avoid possibly
     450                 :  *  doing I/O while holding buffer locks.  The pins are passed back to the
     451                 :  *  caller using the input-output arguments vmbuffer and vmbuffer_other.
     452                 :  *  Note that in some cases the caller might have already acquired such pins,
     453                 :  *  which is indicated by these arguments not being InvalidBuffer on entry.
     454                 :  *
     455                 :  *  We normally use FSM to help us find free space.  However,
     456                 :  *  if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
     457                 :  *  the end of the relation if the tuple won't fit on the current target page.
     458                 :  *  This can save some cycles when we know the relation is new and doesn't
     459                 :  *  contain useful amounts of free space.
     460                 :  *
     461                 :  *  HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
     462                 :  *  relation, if the caller holds exclusive lock and is careful to invalidate
     463                 :  *  relation's smgr_targblock before the first insertion --- that ensures that
     464                 :  *  all insertions will occur into newly added pages and not be intermixed
     465                 :  *  with tuples from other transactions.  That way, a crash can't risk losing
     466                 :  *  any committed data of other transactions.  (See heap_insert's comments
     467                 :  *  for additional constraints needed for safe usage of this behavior.)
     468                 :  *
     469                 :  *  The caller can also provide a BulkInsertState object to optimize many
     470                 :  *  insertions into the same relation.  This keeps a pin on the current
     471                 :  *  insertion target page (to save pin/unpin cycles) and also passes a
     472                 :  *  BULKWRITE buffer selection strategy object to the buffer manager.
     473                 :  *  Passing NULL for bistate selects the default behavior.
     474                 :  *
     475                 :  *  We don't fill existing pages further than the fillfactor, except for large
     476                 :  *  tuples in nearly-empty pages.  This is OK since this routine is not
     477                 :  *  consulted when updating a tuple and keeping it on the same page, which is
     478                 :  *  the scenario fillfactor is meant to reserve space for.
     479                 :  *
     480                 :  *  ereport(ERROR) is allowed here, so this routine *must* be called
     481                 :  *  before any (unlogged) changes are made in buffer pool.
     482                 :  */
     483                 : Buffer
     484 GIC    12869513 : RelationGetBufferForTuple(Relation relation, Size len,
     485                 :                           Buffer otherBuffer, int options,
     486                 :                           BulkInsertState bistate,
     487                 :                           Buffer *vmbuffer, Buffer *vmbuffer_other,
     488                 :                           int num_pages)
     489                 : {
     490        12869513 :     bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
     491        12869513 :     Buffer      buffer = InvalidBuffer;
     492 ECB             :     Page        page;
     493                 :     Size        nearlyEmptyFreeSpace,
     494 GBC    12869513 :                 pageFreeSpace = 0,
     495 GIC    12869513 :                 saveFreeSpace = 0,
     496        12869513 :                 targetFreeSpace = 0;
     497                 :     BlockNumber targetBlock,
     498 ECB             :                 otherBlock;
     499                 :     bool        unlockedTargetBuffer;
     500                 :     bool        recheckVmPins;
     501                 : 
     502 GIC    12869513 :     len = MAXALIGN(len);        /* be conservative */
     503                 : 
     504                 :     /* if the caller doesn't know by how many pages to extend, extend by 1 */
     505 GNC    12869513 :     if (num_pages <= 0)
     506        12032039 :         num_pages = 1;
     507                 : 
     508                 :     /* Bulk insert is not supported for updates, only inserts. */
     509 GIC    12869513 :     Assert(otherBuffer == InvalidBuffer || !bistate);
     510                 : 
     511 ECB             :     /*
     512                 :      * If we're gonna fail for oversize tuple, do it right away
     513                 :      */
     514 CBC    12869513 :     if (len > MaxHeapTupleSize)
     515 UIC           0 :         ereport(ERROR,
     516                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     517 ECB             :                  errmsg("row is too big: size %zu, maximum size %zu",
     518                 :                         len, MaxHeapTupleSize)));
     519                 : 
     520                 :     /* Compute desired extra freespace due to fillfactor option */
     521 GIC    12869513 :     saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
     522                 :                                                    HEAP_DEFAULT_FILLFACTOR);
     523                 : 
     524 ECB             :     /*
     525                 :      * Since pages without tuples can still have line pointers, we consider
     526                 :      * pages "empty" when the unavailable space is slight.  This threshold is
     527                 :      * somewhat arbitrary, but it should prevent most unnecessary relation
     528                 :      * extensions while inserting large tuples into low-fillfactor tables.
     529                 :      */
     530 GIC    12869513 :     nearlyEmptyFreeSpace = MaxHeapTupleSize -
     531 ECB             :         (MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));
     532 GIC    12869513 :     if (len + saveFreeSpace > nearlyEmptyFreeSpace)
     533 CBC          36 :         targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
     534                 :     else
     535        12869477 :         targetFreeSpace = len + saveFreeSpace;
     536                 : 
     537 GIC    12869513 :     if (otherBuffer != InvalidBuffer)
     538 CBC      196178 :         otherBlock = BufferGetBlockNumber(otherBuffer);
     539                 :     else
     540 GIC    12673335 :         otherBlock = InvalidBlockNumber;    /* just to keep compiler quiet */
     541                 : 
     542 ECB             :     /*
     543                 :      * We first try to put the tuple on the same page we last inserted a tuple
     544                 :      * on, as cached in the BulkInsertState or relcache entry.  If that
     545                 :      * doesn't work, we ask the Free Space Map to locate a suitable page.
     546                 :      * Since the FSM's info might be out of date, we have to be prepared to
     547                 :      * loop around and retry multiple times. (To insure this isn't an infinite
     548                 :      * loop, we must update the FSM with the correct amount of free space on
     549                 :      * each page that proves not to be suitable.)  If the FSM has no record of
     550                 :      * a page with enough free space, we give up and extend the relation.
     551                 :      *
     552                 :      * When use_fsm is false, we either put the tuple onto the existing target
     553                 :      * page or extend the relation.
     554                 :      */
     555 CBC    12869513 :     if (bistate && bistate->current_buf != InvalidBuffer)
     556 GIC     1112439 :         targetBlock = BufferGetBlockNumber(bistate->current_buf);
     557 ECB             :     else
     558 CBC    11757074 :         targetBlock = RelationGetTargetBlock(relation);
     559                 : 
     560 GIC    12869513 :     if (targetBlock == InvalidBlockNumber && use_fsm)
     561                 :     {
     562 ECB             :         /*
     563                 :          * We have no cached target page, so ask the FSM for an initial
     564                 :          * target.
     565                 :          */
     566 GIC      120652 :         targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
     567 ECB             :     }
     568                 : 
     569                 :     /*
     570                 :      * If the FSM knows nothing of the rel, try the last page before we give
     571                 :      * up and extend.  This avoids one-tuple-per-page syndrome during
     572                 :      * bootstrapping or in a recently-started system.
     573                 :      */
     574 GIC    12869513 :     if (targetBlock == InvalidBlockNumber)
     575                 :     {
     576          111694 :         BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
     577                 : 
     578          111694 :         if (nblocks > 0)
     579           87859 :             targetBlock = nblocks - 1;
     580                 :     }
     581                 : 
     582        12869513 : loop:
     583        13086260 :     while (targetBlock != InvalidBlockNumber)
     584                 :     {
     585                 :         /*
     586                 :          * Read and exclusive-lock the target block, as well as the other
     587                 :          * block if one was given, taking suitable care with lock ordering and
     588                 :          * the possibility they are the same block.
     589                 :          *
     590                 :          * If the page-level all-visible flag is set, caller will need to
     591                 :          * clear both that and the corresponding visibility map bit.  However,
     592                 :          * by the time we return, we'll have x-locked the buffer, and we don't
     593                 :          * want to do any I/O while in that state.  So we check the bit here
     594                 :          * before taking the lock, and pin the page if it appears necessary.
     595                 :          * Checking without the lock creates a risk of getting the wrong
     596                 :          * answer, so we'll have to recheck after acquiring the lock.
     597                 :          */
     598        12879865 :         if (otherBuffer == InvalidBuffer)
     599                 :         {
     600                 :             /* easy case */
     601        12680066 :             buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
     602        12680066 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     603           12083 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     604                 : 
     605                 :             /*
     606                 :              * If the page is empty, pin vmbuffer to set all_frozen bit later.
     607                 :              */
     608        12683540 :             if ((options & HEAP_INSERT_FROZEN) &&
     609            3474 :                 (PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
     610            1542 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     611                 : 
     612        12680066 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     613                 :         }
     614          199799 :         else if (otherBlock == targetBlock)
     615                 :         {
     616                 :             /* also easy case */
     617            1461 :             buffer = otherBuffer;
     618            1461 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     619 UIC           0 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     620 GIC        1461 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     621                 :         }
     622          198338 :         else if (otherBlock < targetBlock)
     623                 :         {
     624                 :             /* lock other buffer first */
     625          194756 :             buffer = ReadBuffer(relation, targetBlock);
     626          194756 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     627             323 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     628          194756 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     629          194756 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     630                 :         }
     631                 :         else
     632                 :         {
     633                 :             /* lock target buffer first */
     634            3582 :             buffer = ReadBuffer(relation, targetBlock);
     635            3582 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     636              40 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     637            3582 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     638            3582 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     639                 :         }
     640                 : 
     641 ECB             :         /*
     642                 :          * We now have the target page (and the other buffer, if any) pinned
     643                 :          * and locked.  However, since our initial PageIsAllVisible checks
     644                 :          * were performed before acquiring the lock, the results might now be
     645                 :          * out of date, either for the selected victim buffer, or for the
     646                 :          * other buffer passed by the caller.  In that case, we'll need to
     647                 :          * give up our locks, go get the pin(s) we failed to get earlier, and
     648                 :          * re-lock.  That's pretty painful, but hopefully shouldn't happen
     649                 :          * often.
     650                 :          *
     651                 :          * Note that there's a small possibility that we didn't pin the page
     652                 :          * above but still have the correct page pinned anyway, either because
     653                 :          * we've already made a previous pass through this loop, or because
     654                 :          * caller passed us the right page anyway.
     655                 :          *
     656                 :          * Note also that it's possible that by the time we get the pin and
     657                 :          * retake the buffer locks, the visibility map bit will have been
     658                 :          * cleared by some other backend anyway.  In that case, we'll have
     659                 :          * done a bit of extra work for no gain, but there's no real harm
     660                 :          * done.
     661                 :          */
     662 GNC    12879865 :         GetVisibilityMapPins(relation, buffer, otherBuffer,
     663                 :                              targetBlock, otherBlock, vmbuffer,
     664                 :                              vmbuffer_other);
     665                 : 
     666 ECB             :         /*
     667 EUB             :          * Now we can check to see if there's enough free space here. If so,
     668                 :          * we're done.
     669                 :          */
     670 GIC    12879865 :         page = BufferGetPage(buffer);
     671                 : 
     672                 :         /*
     673 ECB             :          * If necessary initialize page, it'll be used soon.  We could avoid
     674                 :          * dirtying the buffer here, and rely on the caller to do so whenever
     675                 :          * it puts a tuple onto the page, but there seems not much benefit in
     676                 :          * doing so.
     677                 :          */
     678 GIC    12879865 :         if (PageIsNew(page))
     679                 :         {
     680            9265 :             PageInit(page, BufferGetPageSize(buffer), 0);
     681            9265 :             MarkBufferDirty(buffer);
     682 ECB             :         }
     683                 : 
     684 CBC    12879865 :         pageFreeSpace = PageGetHeapFreeSpace(page);
     685        12879865 :         if (targetFreeSpace <= pageFreeSpace)
     686                 :         {
     687 ECB             :             /* use this page as future insert target, too */
     688 GIC    12654166 :             RelationSetTargetBlock(relation, targetBlock);
     689 CBC    12654166 :             return buffer;
     690 ECB             :         }
     691                 : 
     692                 :         /*
     693                 :          * Not enough space, so we must give up our page locks and pin (if
     694                 :          * any) and prepare to look elsewhere.  We don't care which order we
     695                 :          * unlock the two buffers in, so this can be slightly simpler than the
     696                 :          * code above.
     697                 :          */
     698 GIC      225699 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     699          225699 :         if (otherBuffer == InvalidBuffer)
     700          215235 :             ReleaseBuffer(buffer);
     701           10464 :         else if (otherBlock != targetBlock)
     702                 :         {
     703            9003 :             LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     704            9003 :             ReleaseBuffer(buffer);
     705                 :         }
     706                 : 
     707                 :         /* Is there an ongoing bulk extension? */
     708 GNC      225699 :         if (bistate && bistate->next_free != InvalidBlockNumber)
     709                 :         {
     710            7765 :             Assert(bistate->next_free <= bistate->last_free);
     711 ECB             : 
     712                 :             /*
     713                 :              * We bulk extended the relation before, and there are still some
     714                 :              * unused pages from that extension, so we don't need to look in
     715                 :              * the FSM for a new page. But do record the free space from the
     716                 :              * last page, somebody might insert narrower tuples later.
     717                 :              */
     718 GNC        7765 :             if (use_fsm)
     719            6216 :                 RecordPageWithFreeSpace(relation, targetBlock, pageFreeSpace);
     720                 : 
     721            7765 :             targetBlock = bistate->next_free;
     722            7765 :             if (bistate->next_free >= bistate->last_free)
     723                 :             {
     724             977 :                 bistate->next_free = InvalidBlockNumber;
     725             977 :                 bistate->last_free = InvalidBlockNumber;
     726                 :             }
     727                 :             else
     728            6788 :                 bistate->next_free++;
     729                 :         }
     730          217934 :         else if (!use_fsm)
     731                 :         {
     732                 :             /* Without FSM, always fall out of the loop and extend */
     733            8952 :             break;
     734                 :         }
     735                 :         else
     736                 :         {
     737                 :             /*
     738                 :              * Update FSM as to condition of this page, and ask for another
     739                 :              * page to try.
     740                 :              */
     741          208982 :             targetBlock = RecordAndGetPageWithFreeSpace(relation,
     742                 :                                                         targetBlock,
     743                 :                                                         pageFreeSpace,
     744                 :                                                         targetFreeSpace);
     745                 :         }
     746                 :     }
     747                 : 
     748                 :     /* Have to extend the relation */
     749          215347 :     buffer = RelationAddBlocks(relation, bistate, num_pages, use_fsm,
     750                 :                                &unlockedTargetBuffer);
     751                 : 
     752          215347 :     targetBlock = BufferGetBlockNumber(buffer);
     753          215347 :     page = BufferGetPage(buffer);
     754 ECB             : 
     755                 :     /*
     756                 :      * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
     757                 :      * do IO while the buffer is locked, so we unlock the page first if IO is
     758                 :      * needed (necessitating checks below).
     759                 :      */
     760 GNC      215347 :     if (options & HEAP_INSERT_FROZEN)
     761                 :     {
     762             211 :         Assert(PageGetMaxOffsetNumber(page) == 0);
     763                 : 
     764             211 :         if (!visibilitymap_pin_ok(targetBlock, *vmbuffer))
     765                 :         {
     766             112 :             if (!unlockedTargetBuffer)
     767             112 :                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     768             112 :             unlockedTargetBuffer = true;
     769             112 :             visibilitymap_pin(relation, targetBlock, vmbuffer);
     770 ECB             :         }
     771                 :     }
     772                 : 
     773                 :     /*
     774                 :      * Reacquire locks if necessary.
     775                 :      *
     776                 :      * If the target buffer was unlocked above, or is unlocked while
     777                 :      * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
     778                 :      * that another backend used space on this page. We check for that below,
     779                 :      * and retry if necessary.
     780                 :      */
     781 GNC      215347 :     recheckVmPins = false;
     782          215347 :     if (unlockedTargetBuffer)
     783                 :     {
     784                 :         /* released lock on target buffer above */
     785             361 :         if (otherBuffer != InvalidBuffer)
     786               2 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     787             361 :         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     788             361 :         recheckVmPins = true;
     789                 :     }
     790          214986 :     else if (otherBuffer != InvalidBuffer)
     791                 :     {
     792                 :         /*
     793                 :          * We did not release the target buffer, and otherBuffer is valid,
     794                 :          * need to lock the other buffer. It's guaranteed to be of a lower
     795                 :          * page number than the new page.  To conform with the deadlock
     796                 :          * prevent rules, we ought to lock otherBuffer first, but that would
     797                 :          * give other backends a chance to put tuples on our page. To reduce
     798                 :          * the likelihood of that, attempt to lock the other buffer
     799                 :          * conditionally, that's very likely to work.
     800                 :          *
     801                 :          * Alternatively, we could acquire the lock on otherBuffer before
     802                 :          * extending the relation, but that'd require holding the lock while
     803                 :          * performing IO, which seems worse than an unlikely retry.
     804                 :          */
     805 GIC        6841 :         Assert(otherBuffer != buffer);
     806            6841 :         Assert(targetBlock > otherBlock);
     807                 : 
     808            6841 :         if (unlikely(!ConditionalLockBuffer(otherBuffer)))
     809                 :         {
     810 UNC           0 :             unlockedTargetBuffer = true;
     811 LBC           0 :             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     812 UIC           0 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     813 LBC           0 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     814 ECB             :         }
     815 GNC        6841 :         recheckVmPins = true;
     816                 :     }
     817                 : 
     818                 :     /*
     819                 :      * If one of the buffers was unlocked (always the case if otherBuffer is
     820                 :      * valid), it's possible, although unlikely, that an all-visible flag
     821                 :      * became set.  We can use GetVisibilityMapPins to deal with that. It's
     822                 :      * possible that GetVisibilityMapPins() might need to temporarily release
     823                 :      * buffer locks, in which case we'll need to check if there's still enough
     824                 :      * space on the page below.
     825                 :      */
     826          215347 :     if (recheckVmPins)
     827                 :     {
     828            7202 :         if (GetVisibilityMapPins(relation, otherBuffer, buffer,
     829                 :                                  otherBlock, targetBlock, vmbuffer_other,
     830                 :                                  vmbuffer))
     831 UNC           0 :             unlockedTargetBuffer = true;
     832                 :     }
     833                 : 
     834                 :     /*
     835                 :      * If the target buffer was temporarily unlocked since the relation
     836                 :      * extension, it's possible, although unlikely, that all the space on the
     837                 :      * page was already used. If so, we just retry from the start.  If we
     838                 :      * didn't unlock, something has gone wrong if there's not enough space -
     839                 :      * the test at the top should have prevented reaching this case.
     840                 :      */
     841 GNC      215347 :     pageFreeSpace = PageGetHeapFreeSpace(page);
     842          215347 :     if (len > pageFreeSpace)
     843                 :     {
     844 UNC           0 :         if (unlockedTargetBuffer)
     845 ECB             :         {
     846 UNC           0 :             if (otherBuffer != InvalidBuffer)
     847               0 :                 LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     848 LBC           0 :             UnlockReleaseBuffer(buffer);
     849 ECB             : 
     850 UIC           0 :             goto loop;
     851                 :         }
     852               0 :         elog(PANIC, "tuple is too big: size %zu", len);
     853                 :     }
     854                 : 
     855                 :     /*
     856                 :      * Remember the new page as our target for future insertions.
     857                 :      *
     858                 :      * XXX should we enter the new page into the free space map immediately,
     859 ECB             :      * or just keep it for this backend's exclusive use in the short run
     860                 :      * (until VACUUM sees it)?  Seems to depend on whether you expect the
     861                 :      * current backend to make more insertions or not, which is probably a
     862                 :      * good bet most of the time.  So for now, don't add it to FSM yet.
     863                 :      */
     864 GNC      215347 :     RelationSetTargetBlock(relation, targetBlock);
     865 ECB             : 
     866 CBC      215347 :     return buffer;
     867                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a