LCOV - differential code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 87.8 % 255 224 18 5 7 1 6 66 126 26 18 154 6 39
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 16 16 8 8 14 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * localbuf.c
       4                 :  *    local buffer manager. Fast buffer manager for temporary tables,
       5                 :  *    which never need to be WAL-logged or checkpointed, etc.
       6                 :  *
       7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       8                 :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9                 :  *
      10                 :  *
      11                 :  * IDENTIFICATION
      12                 :  *    src/backend/storage/buffer/localbuf.c
      13                 :  *
      14                 :  *-------------------------------------------------------------------------
      15                 :  */
      16                 : #include "postgres.h"
      17                 : 
      18                 : #include "access/parallel.h"
      19                 : #include "catalog/catalog.h"
      20                 : #include "executor/instrument.h"
      21                 : #include "pgstat.h"
      22                 : #include "storage/buf_internals.h"
      23                 : #include "storage/bufmgr.h"
      24                 : #include "utils/guc_hooks.h"
      25                 : #include "utils/memutils.h"
      26                 : #include "utils/resowner_private.h"
      27                 : 
      28                 : 
      29                 : /*#define LBDEBUG*/
      30                 : 
      31                 : /* entry for buffer lookup hashtable */
      32                 : typedef struct
      33                 : {
      34                 :     BufferTag   key;            /* Tag of a disk page */
      35                 :     int         id;             /* Associated local buffer's index */
      36                 : } LocalBufferLookupEnt;
      37                 : 
      38                 : /* Note: this macro only works on local buffers, not shared ones! */
      39                 : #define LocalBufHdrGetBlock(bufHdr) \
      40                 :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      41                 : 
      42                 : int         NLocBuffer = 0;     /* until buffers are initialized */
      43                 : 
      44                 : BufferDesc *LocalBufferDescriptors = NULL;
      45                 : Block      *LocalBufferBlockPointers = NULL;
      46                 : int32      *LocalRefCount = NULL;
      47                 : 
      48                 : static int  nextFreeLocalBufId = 0;
      49                 : 
      50                 : static HTAB *LocalBufHash = NULL;
      51                 : 
      52                 : /* number of local buffers pinned at least once */
      53                 : static int  NLocalPinnedBuffers = 0;
      54                 : 
      55                 : 
      56                 : static void InitLocalBuffers(void);
      57                 : static Block GetLocalBufferStorage(void);
      58                 : static Buffer GetLocalVictimBuffer(void);
      59                 : 
      60                 : 
      61                 : /*
      62                 :  * PrefetchLocalBuffer -
      63                 :  *    initiate asynchronous read of a block of a relation
      64                 :  *
      65                 :  * Do PrefetchBuffer's work for temporary relations.
      66                 :  * No-op if prefetching isn't compiled in.
      67                 :  */
      68                 : PrefetchBufferResult
      69 GIC        6244 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      70                 :                     BlockNumber blockNum)
      71                 : {
      72            6244 :     PrefetchBufferResult result = {InvalidBuffer, false};
      73                 :     BufferTag   newTag;         /* identity of requested block */
      74 ECB             :     LocalBufferLookupEnt *hresult;
      75                 : 
      76 GNC        6244 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      77 ECB             : 
      78                 :     /* Initialize local buffers if first request in this session */
      79 GIC        6244 :     if (LocalBufHash == NULL)
      80 UIC           0 :         InitLocalBuffers();
      81 ECB             : 
      82                 :     /* See if the desired buffer already exists */
      83                 :     hresult = (LocalBufferLookupEnt *)
      84 GNC        6244 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      85 EUB             : 
      86 GIC        6244 :     if (hresult)
      87                 :     {
      88                 :         /* Yes, so nothing to do */
      89 CBC        6244 :         result.recent_buffer = -hresult->id - 1;
      90                 :     }
      91 ECB             :     else
      92                 :     {
      93                 : #ifdef USE_PREFETCH
      94                 :         /* Not in buffers, so initiate prefetch */
      95 UNC           0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      96               0 :             smgrprefetch(smgr, forkNum, blockNum))
      97                 :         {
      98               0 :             result.initiated_io = true;
      99                 :         }
     100                 : #endif                          /* USE_PREFETCH */
     101                 :     }
     102                 : 
     103 GBC        6244 :     return result;
     104 EUB             : }
     105                 : 
     106                 : 
     107                 : /*
     108                 :  * LocalBufferAlloc -
     109                 :  *    Find or create a local buffer for the given page of the given relation.
     110                 :  *
     111 ECB             :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need
     112                 :  * to do any locking since this is all local.   Also, IO_IN_PROGRESS
     113                 :  * does not get set.  Lastly, we support only default access strategy
     114                 :  * (hence, usage_count is always advanced).
     115                 :  */
     116                 : BufferDesc *
     117 GIC     1247393 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     118                 :                  bool *foundPtr)
     119                 : {
     120                 :     BufferTag   newTag;         /* identity of requested block */
     121                 :     LocalBufferLookupEnt *hresult;
     122                 :     BufferDesc *bufHdr;
     123                 :     Buffer      victim_buffer;
     124                 :     int         bufid;
     125 ECB             :     bool        found;
     126                 : 
     127 GNC     1247393 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     128                 : 
     129                 :     /* Initialize local buffers if first request in this session */
     130 GIC     1247393 :     if (LocalBufHash == NULL)
     131              13 :         InitLocalBuffers();
     132                 : 
     133                 :     /* See if the desired buffer already exists */
     134 ECB             :     hresult = (LocalBufferLookupEnt *)
     135 GNC     1247393 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     136                 : 
     137 CBC     1247393 :     if (hresult)
     138 ECB             :     {
     139 GNC     1243603 :         bufid = hresult->id;
     140         1243603 :         bufHdr = GetLocalBufferDescriptor(bufid);
     141         1243603 :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     142 ECB             : 
     143 GNC     1243603 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     144                 :     }
     145                 :     else
     146                 :     {
     147                 :         uint32      buf_state;
     148                 : 
     149            3790 :         victim_buffer = GetLocalVictimBuffer();
     150            3790 :         bufid = -victim_buffer - 1;
     151            3790 :         bufHdr = GetLocalBufferDescriptor(bufid);
     152                 : 
     153                 :         hresult = (LocalBufferLookupEnt *)
     154            3790 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     155            3790 :         if (found)              /* shouldn't happen */
     156 UNC           0 :             elog(ERROR, "local buffer hash table corrupted");
     157 GNC        3790 :         hresult->id = bufid;
     158                 : 
     159                 :         /*
     160                 :          * it's all ours now.
     161                 :          */
     162            3790 :         bufHdr->tag = newTag;
     163                 : 
     164            3790 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     165            3790 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     166            3790 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     167            3790 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     168                 : 
     169            3790 :         *foundPtr = false;
     170                 :     }
     171 ECB             : 
     172 GNC     1247393 :     return bufHdr;
     173                 : }
     174                 : 
     175                 : static Buffer
     176           15296 : GetLocalVictimBuffer(void)
     177                 : {
     178                 :     int         victim_bufid;
     179                 :     int         trycounter;
     180                 :     uint32      buf_state;
     181                 :     BufferDesc *bufHdr;
     182                 : 
     183           15296 :     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
     184                 : 
     185 ECB             :     /*
     186                 :      * Need to get a new buffer.  We use a clock sweep algorithm (essentially
     187                 :      * the same as what freelist.c does now...)
     188                 :      */
     189 GIC       15296 :     trycounter = NLocBuffer;
     190                 :     for (;;)
     191                 :     {
     192 GNC       18503 :         victim_bufid = nextFreeLocalBufId;
     193                 : 
     194           18503 :         if (++nextFreeLocalBufId >= NLocBuffer)
     195              33 :             nextFreeLocalBufId = 0;
     196                 : 
     197           18503 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     198                 : 
     199           18503 :         if (LocalRefCount[victim_bufid] == 0)
     200                 :         {
     201 GIC       18500 :             buf_state = pg_atomic_read_u32(&bufHdr->state);
     202                 : 
     203           18500 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     204                 :             {
     205 CBC        3204 :                 buf_state -= BUF_USAGECOUNT_ONE;
     206 GIC        3204 :                 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     207            3204 :                 trycounter = NLocBuffer;
     208 ECB             :             }
     209                 :             else
     210                 :             {
     211                 :                 /* Found a usable buffer */
     212 GNC       15296 :                 PinLocalBuffer(bufHdr, false);
     213 CBC       15296 :                 break;
     214                 :             }
     215 ECB             :         }
     216 GIC           3 :         else if (--trycounter == 0)
     217 LBC           0 :             ereport(ERROR,
     218                 :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     219 ECB             :                      errmsg("no empty local buffer available")));
     220                 :     }
     221                 : 
     222                 :     /*
     223                 :      * lazy memory allocation: allocate space on first use of a buffer.
     224                 :      */
     225 CBC       15296 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     226                 :     {
     227                 :         /* Set pointer for use by BufferGetBlock() macro */
     228           14051 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     229                 :     }
     230 ECB             : 
     231                 :     /*
     232                 :      * this buffer is not referenced but it might still be dirty. if that's
     233                 :      * the case, write it out before reusing it!
     234                 :      */
     235 GNC       15296 :     if (buf_state & BM_DIRTY)
     236                 :     {
     237                 :         instr_time  io_start;
     238                 :         SMgrRelation oreln;
     239             447 :         Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     240                 : 
     241                 :         /* Find smgr relation for buffer */
     242             447 :         oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyBackendId);
     243                 : 
     244             447 :         PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     245                 : 
     246             447 :         io_start = pgstat_prepare_io_time();
     247                 : 
     248                 :         /* And write... */
     249             447 :         smgrwrite(oreln,
     250             447 :                   BufTagGetForkNum(&bufHdr->tag),
     251                 :                   bufHdr->tag.blockNum,
     252                 :                   localpage,
     253                 :                   false);
     254                 : 
     255                 :         /* Temporary table I/O does not use Buffer Access Strategies */
     256             447 :         pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     257                 :                                 IOOP_WRITE, io_start, 1);
     258                 : 
     259                 :         /* Mark not-dirty now in case we error out below */
     260             447 :         buf_state &= ~BM_DIRTY;
     261             447 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     262                 : 
     263             447 :         pgBufferUsage.local_blks_written++;
     264                 :     }
     265                 : 
     266                 :     /*
     267                 :      * Remove the victim buffer from the hashtable and mark as invalid.
     268                 :      */
     269 GIC       15296 :     if (buf_state & BM_TAG_VALID)
     270 ECB             :     {
     271                 :         LocalBufferLookupEnt *hresult;
     272                 : 
     273                 :         hresult = (LocalBufferLookupEnt *)
     274 GNC         450 :             hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     275 GIC         450 :         if (!hresult)           /* shouldn't happen */
     276 UIC           0 :             elog(ERROR, "local buffer hash table corrupted");
     277                 :         /* mark buffer invalid just in case hash insert fails */
     278 GNC         450 :         ClearBufferTag(&bufHdr->tag);
     279             450 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     280 GIC         450 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     281 GNC         450 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
     282                 :     }
     283 ECB             : 
     284 GNC       15296 :     return BufferDescriptorGetBuffer(bufHdr);
     285                 : }
     286                 : 
     287                 : /* see LimitAdditionalPins() */
     288                 : static void
     289           10895 : LimitAdditionalLocalPins(uint32 *additional_pins)
     290                 : {
     291                 :     uint32      max_pins;
     292                 : 
     293           10895 :     if (*additional_pins <= 1)
     294           10709 :         return;
     295                 : 
     296                 :     /*
     297                 :      * In contrast to LimitAdditionalPins() other backends don't play a role
     298                 :      * here. We can allow up to NLocBuffer pins in total.
     299 ECB             :      */
     300 GNC         186 :     max_pins = (NLocBuffer - NLocalPinnedBuffers);
     301 EUB             : 
     302 GNC         186 :     if (*additional_pins >= max_pins)
     303 UNC           0 :         *additional_pins = max_pins;
     304                 : }
     305                 : 
     306                 : /*
     307                 :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     308                 :  * temporary buffers.
     309                 :  */
     310                 : BlockNumber
     311 GNC       10895 : ExtendBufferedRelLocal(ExtendBufferedWhat eb,
     312                 :                        ForkNumber fork,
     313                 :                        uint32 flags,
     314                 :                        uint32 extend_by,
     315                 :                        BlockNumber extend_upto,
     316                 :                        Buffer *buffers,
     317                 :                        uint32 *extended_by)
     318                 : {
     319                 :     BlockNumber first_block;
     320                 :     instr_time  io_start;
     321                 : 
     322                 :     /* Initialize local buffers if first request in this session */
     323           10895 :     if (LocalBufHash == NULL)
     324             224 :         InitLocalBuffers();
     325                 : 
     326           10895 :     LimitAdditionalLocalPins(&extend_by);
     327                 : 
     328           22401 :     for (uint32 i = 0; i < extend_by; i++)
     329                 :     {
     330                 :         BufferDesc *buf_hdr;
     331                 :         Block       buf_block;
     332                 : 
     333           11506 :         buffers[i] = GetLocalVictimBuffer();
     334           11506 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     335           11506 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     336                 : 
     337                 :         /* new buffers are zero-filled */
     338           11506 :         MemSet((char *) buf_block, 0, BLCKSZ);
     339                 :     }
     340                 : 
     341           10895 :     first_block = smgrnblocks(eb.smgr, fork);
     342                 : 
     343           10895 :     if (extend_upto != InvalidBlockNumber)
     344                 :     {
     345                 :         /*
     346                 :          * In contrast to shared relations, nothing could change the relation
     347                 :          * size concurrently. Thus we shouldn't end up finding that we don't
     348                 :          * need to do anything.
     349                 :          */
     350             128 :         Assert(first_block <= extend_upto);
     351                 : 
     352             128 :         Assert((uint64) first_block + extend_by <= extend_upto);
     353                 :     }
     354                 : 
     355                 :     /* Fail if relation is already at maximum possible length */
     356           10895 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     357 UNC           0 :         ereport(ERROR,
     358                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     359                 :                  errmsg("cannot extend relation %s beyond %u blocks",
     360                 :                         relpath(eb.smgr->smgr_rlocator, fork),
     361                 :                         MaxBlockNumber)));
     362                 : 
     363 GNC       22401 :     for (int i = 0; i < extend_by; i++)
     364                 :     {
     365                 :         int         victim_buf_id;
     366                 :         BufferDesc *victim_buf_hdr;
     367                 :         BufferTag   tag;
     368                 :         LocalBufferLookupEnt *hresult;
     369                 :         bool        found;
     370                 : 
     371           11506 :         victim_buf_id = -buffers[i] - 1;
     372           11506 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     373                 : 
     374           11506 :         InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
     375                 : 
     376                 :         hresult = (LocalBufferLookupEnt *)
     377           11506 :             hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found);
     378           11506 :         if (found)
     379                 :         {
     380 UNC           0 :             BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id);
     381                 :             uint32      buf_state;
     382                 : 
     383               0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     384                 : 
     385               0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     386               0 :             PinLocalBuffer(existing_hdr, false);
     387               0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     388                 : 
     389               0 :             buf_state = pg_atomic_read_u32(&existing_hdr->state);
     390               0 :             Assert(buf_state & BM_TAG_VALID);
     391               0 :             Assert(!(buf_state & BM_DIRTY));
     392               0 :             buf_state &= BM_VALID;
     393               0 :             pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
     394                 :         }
     395                 :         else
     396                 :         {
     397 GNC       11506 :             uint32      buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
     398                 : 
     399           11506 :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     400                 : 
     401           11506 :             victim_buf_hdr->tag = tag;
     402                 : 
     403           11506 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     404                 : 
     405           11506 :             pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
     406                 : 
     407           11506 :             hresult->id = victim_buf_id;
     408                 :         }
     409                 :     }
     410                 : 
     411           10895 :     io_start = pgstat_prepare_io_time();
     412                 : 
     413                 :     /* actually extend relation */
     414           10895 :     smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
     415                 : 
     416           10895 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     417                 :                             io_start, extend_by);
     418                 : 
     419           22401 :     for (int i = 0; i < extend_by; i++)
     420                 :     {
     421           11506 :         Buffer      buf = buffers[i];
     422                 :         BufferDesc *buf_hdr;
     423                 :         uint32      buf_state;
     424                 : 
     425           11506 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     426                 : 
     427           11506 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     428           11506 :         buf_state |= BM_VALID;
     429           11506 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     430                 :     }
     431                 : 
     432           10895 :     *extended_by = extend_by;
     433                 : 
     434           10895 :     pgBufferUsage.temp_blks_written += extend_by;
     435                 : 
     436           10895 :     return first_block;
     437 ECB             : }
     438                 : 
     439                 : /*
     440                 :  * MarkLocalBufferDirty -
     441                 :  *    mark a local buffer dirty
     442                 :  */
     443                 : void
     444 GIC     1817055 : MarkLocalBufferDirty(Buffer buffer)
     445                 : {
     446                 :     int         bufid;
     447 ECB             :     BufferDesc *bufHdr;
     448                 :     uint32      buf_state;
     449                 : 
     450 GIC     1817055 :     Assert(BufferIsLocal(buffer));
     451 ECB             : 
     452                 : #ifdef LBDEBUG
     453                 :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     454                 : #endif
     455                 : 
     456 GNC     1817055 :     bufid = -buffer - 1;
     457                 : 
     458 CBC     1817055 :     Assert(LocalRefCount[bufid] > 0);
     459                 : 
     460         1817055 :     bufHdr = GetLocalBufferDescriptor(bufid);
     461 EUB             : 
     462 GIC     1817055 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     463                 : 
     464         1817055 :     if (!(buf_state & BM_DIRTY))
     465           11764 :         pgBufferUsage.local_blks_dirtied++;
     466                 : 
     467         1817055 :     buf_state |= BM_DIRTY;
     468                 : 
     469 CBC     1817055 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     470 GIC     1817055 : }
     471                 : 
     472                 : /*
     473                 :  * DropRelationLocalBuffers
     474                 :  *      This function removes from the buffer pool all the pages of the
     475                 :  *      specified relation that have block numbers >= firstDelBlock.
     476                 :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     477                 :  *      Dirty pages are simply dropped, without bothering to write them
     478                 :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     479                 :  *      used only with extreme caution!
     480                 :  *
     481                 :  *      See DropRelationBuffers in bufmgr.c for more notes.
     482 ECB             :  */
     483                 : void
     484 GNC         346 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
     485                 :                          BlockNumber firstDelBlock)
     486 ECB             : {
     487                 :     int         i;
     488                 : 
     489 GIC      322906 :     for (i = 0; i < NLocBuffer; i++)
     490                 :     {
     491 CBC      322560 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     492 ECB             :         LocalBufferLookupEnt *hresult;
     493                 :         uint32      buf_state;
     494                 : 
     495 GIC      322560 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     496 ECB             : 
     497 GIC      349130 :         if ((buf_state & BM_TAG_VALID) &&
     498 GNC       27455 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
     499             885 :             BufTagGetForkNum(&bufHdr->tag) == forkNum &&
     500 GIC         818 :             bufHdr->tag.blockNum >= firstDelBlock)
     501 ECB             :         {
     502 GIC         792 :             if (LocalRefCount[i] != 0)
     503 UIC           0 :                 elog(ERROR, "block %u of %s is still referenced (local %u)",
     504                 :                      bufHdr->tag.blockNum,
     505                 :                      relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     506                 :                                     MyBackendId,
     507                 :                                     BufTagGetForkNum(&bufHdr->tag)),
     508                 :                      LocalRefCount[i]);
     509                 : 
     510 ECB             :             /* Remove entry from hashtable */
     511                 :             hresult = (LocalBufferLookupEnt *)
     512 GNC         792 :                 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     513 GIC         792 :             if (!hresult)       /* shouldn't happen */
     514 UIC           0 :                 elog(ERROR, "local buffer hash table corrupted");
     515 ECB             :             /* Mark buffer invalid */
     516 GNC         792 :             ClearBufferTag(&bufHdr->tag);
     517 GIC         792 :             buf_state &= ~BUF_FLAG_MASK;
     518             792 :             buf_state &= ~BUF_USAGECOUNT_MASK;
     519             792 :             pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     520                 :         }
     521                 :     }
     522 CBC         346 : }
     523                 : 
     524                 : /*
     525                 :  * DropRelationAllLocalBuffers
     526                 :  *      This function removes from the buffer pool all pages of all forks
     527                 :  *      of the specified relation.
     528                 :  *
     529                 :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     530 ECB             :  */
     531                 : void
     532 GNC        2808 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     533 ECB             : {
     534                 :     int         i;
     535                 : 
     536 CBC     2719232 :     for (i = 0; i < NLocBuffer; i++)
     537 ECB             :     {
     538 GIC     2716424 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     539 EUB             :         LocalBufferLookupEnt *hresult;
     540                 :         uint32      buf_state;
     541                 : 
     542 GBC     2716424 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     543                 : 
     544         2888796 :         if ((buf_state & BM_TAG_VALID) &&
     545 GNC      172372 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     546 EUB             :         {
     547 GIC       14054 :             if (LocalRefCount[i] != 0)
     548 UBC           0 :                 elog(ERROR, "block %u of %s is still referenced (local %u)",
     549 EUB             :                      bufHdr->tag.blockNum,
     550                 :                      relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     551                 :                                     MyBackendId,
     552                 :                                     BufTagGetForkNum(&bufHdr->tag)),
     553                 :                      LocalRefCount[i]);
     554                 :             /* Remove entry from hashtable */
     555                 :             hresult = (LocalBufferLookupEnt *)
     556 GNC       14054 :                 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     557 GIC       14054 :             if (!hresult)       /* shouldn't happen */
     558 LBC           0 :                 elog(ERROR, "local buffer hash table corrupted");
     559                 :             /* Mark buffer invalid */
     560 GNC       14054 :             ClearBufferTag(&bufHdr->tag);
     561 GIC       14054 :             buf_state &= ~BUF_FLAG_MASK;
     562 CBC       14054 :             buf_state &= ~BUF_USAGECOUNT_MASK;
     563 GIC       14054 :             pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     564 ECB             :         }
     565                 :     }
     566 CBC        2808 : }
     567                 : 
     568                 : /*
     569                 :  * InitLocalBuffers -
     570 ECB             :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     571                 :  *    don't involve local buffers, we delay allocating actual memory for the
     572                 :  *    buffers until we need them; just make the buffer headers here.
     573                 :  */
     574                 : static void
     575 CBC         237 : InitLocalBuffers(void)
     576                 : {
     577 GIC         237 :     int         nbufs = num_temp_buffers;
     578 ECB             :     HASHCTL     info;
     579                 :     int         i;
     580                 : 
     581                 :     /*
     582                 :      * Parallel workers can't access data in temporary tables, because they
     583                 :      * have no visibility into the local buffers of their leader.  This is a
     584                 :      * convenient, low-cost place to provide a backstop check for that.  Note
     585                 :      * that we don't wish to prevent a parallel worker from accessing catalog
     586                 :      * metadata about a temp table, so checks at higher levels would be
     587                 :      * inappropriate.
     588                 :      */
     589 GIC         237 :     if (IsParallelWorker())
     590 UIC           0 :         ereport(ERROR,
     591 ECB             :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     592                 :                  errmsg("cannot access temporary tables during a parallel operation")));
     593                 : 
     594                 :     /* Allocate and zero buffer headers and auxiliary arrays */
     595 CBC         237 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     596 GIC         237 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     597             237 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     598             237 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     599 UIC           0 :         ereport(FATAL,
     600                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     601                 :                  errmsg("out of memory")));
     602                 : 
     603 GNC         237 :     nextFreeLocalBufId = 0;
     604                 : 
     605                 :     /* initialize fields that need to start off nonzero */
     606 GIC      240153 :     for (i = 0; i < nbufs; i++)
     607                 :     {
     608          239916 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     609 ECB             : 
     610                 :         /*
     611                 :          * negative to indicate local buffer. This is tricky: shared buffers
     612                 :          * start with 0. We have to start with -2. (Note that the routine
     613                 :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     614                 :          * is -1.)
     615                 :          */
     616 GIC      239916 :         buf->buf_id = -i - 2;
     617 ECB             : 
     618                 :         /*
     619                 :          * Intentionally do not initialize the buffer's atomic variable
     620                 :          * (besides zeroing the underlying memory above). That way we get
     621                 :          * errors on platforms without atomics, if somebody (re-)introduces
     622                 :          * atomic operations for local buffers.
     623                 :          */
     624                 :     }
     625                 : 
     626                 :     /* Create the lookup hash table */
     627 GIC         237 :     info.keysize = sizeof(BufferTag);
     628 CBC         237 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     629 ECB             : 
     630 GIC         237 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     631                 :                                nbufs,
     632                 :                                &info,
     633                 :                                HASH_ELEM | HASH_BLOBS);
     634                 : 
     635             237 :     if (!LocalBufHash)
     636 UIC           0 :         elog(ERROR, "could not initialize local buffer hash table");
     637                 : 
     638                 :     /* Initialization done, mark buffers allocated */
     639 GIC         237 :     NLocBuffer = nbufs;
     640             237 : }
     641                 : 
     642                 : /*
     643                 :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     644                 :  * that does not support adjusting the usagecount - but so far it does not
     645                 :  * seem worth the trouble.
     646                 :  */
     647                 : bool
     648 GNC     1258899 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     649                 : {
     650                 :     uint32      buf_state;
     651         1258899 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     652         1258899 :     int         bufid = -buffer - 1;
     653                 : 
     654         1258899 :     buf_state = pg_atomic_read_u32(&buf_hdr->state);
     655                 : 
     656         1258899 :     if (LocalRefCount[bufid] == 0)
     657                 :     {
     658         1174411 :         NLocalPinnedBuffers++;
     659         1174411 :         if (adjust_usagecount &&
     660         1159115 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     661                 :         {
     662           51077 :             buf_state += BUF_USAGECOUNT_ONE;
     663           51077 :             pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     664                 :         }
     665                 :     }
     666         1258899 :     LocalRefCount[bufid]++;
     667         1258899 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     668                 :                                 BufferDescriptorGetBuffer(buf_hdr));
     669                 : 
     670         1258899 :     return buf_state & BM_VALID;
     671                 : }
     672                 : 
     673                 : void
     674         1605766 : UnpinLocalBuffer(Buffer buffer)
     675                 : {
     676         1605766 :     int         buffid = -buffer - 1;
     677                 : 
     678         1605766 :     Assert(BufferIsLocal(buffer));
     679         1605766 :     Assert(LocalRefCount[buffid] > 0);
     680         1605766 :     Assert(NLocalPinnedBuffers > 0);
     681                 : 
     682         1605766 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     683         1605766 :     if (--LocalRefCount[buffid] == 0)
     684         1174411 :         NLocalPinnedBuffers--;
     685         1605766 : }
     686                 : 
     687                 : /*
     688                 :  * GUC check_hook for temp_buffers
     689                 :  */
     690                 : bool
     691            1861 : check_temp_buffers(int *newval, void **extra, GucSource source)
     692                 : {
     693                 :     /*
     694                 :      * Once local buffers have been initialized, it's too late to change this.
     695                 :      * However, if this is only a test call, allow it.
     696                 :      */
     697            1861 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     698                 :     {
     699 UNC           0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     700               0 :         return false;
     701                 :     }
     702 GNC        1861 :     return true;
     703                 : }
     704                 : 
     705                 : /*
     706 ECB             :  * GetLocalBufferStorage - allocate memory for a local buffer
     707                 :  *
     708                 :  * The idea of this function is to aggregate our requests for storage
     709                 :  * so that the memory manager doesn't see a whole lot of relatively small
     710                 :  * requests.  Since we'll never give back a local buffer once it's created
     711                 :  * within a particular process, no point in burdening memmgr with separately
     712                 :  * managed chunks.
     713                 :  */
     714                 : static Block
     715 GIC       14051 : GetLocalBufferStorage(void)
     716                 : {
     717 ECB             :     static char *cur_block = NULL;
     718                 :     static int  next_buf_in_block = 0;
     719                 :     static int  num_bufs_in_block = 0;
     720                 :     static int  total_bufs_allocated = 0;
     721                 :     static MemoryContext LocalBufferContext = NULL;
     722                 : 
     723                 :     char       *this_buf;
     724                 : 
     725 GBC       14051 :     Assert(total_bufs_allocated < NLocBuffer);
     726                 : 
     727 GIC       14051 :     if (next_buf_in_block >= num_bufs_in_block)
     728                 :     {
     729                 :         /* Need to make a new request to memmgr */
     730                 :         int         num_bufs;
     731                 : 
     732                 :         /*
     733                 :          * We allocate local buffers in a context of their own, so that the
     734 ECB             :          * space eaten for them is easily recognizable in MemoryContextStats
     735                 :          * output.  Create the context on first use.
     736 EUB             :          */
     737 GIC         375 :         if (LocalBufferContext == NULL)
     738 CBC         237 :             LocalBufferContext =
     739             237 :                 AllocSetContextCreate(TopMemoryContext,
     740 ECB             :                                       "LocalBufferContext",
     741                 :                                       ALLOCSET_DEFAULT_SIZES);
     742                 : 
     743                 :         /* Start with a 16-buffer request; subsequent ones double each time */
     744 CBC         375 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     745                 :         /* But not more than what we need for all remaining local bufs */
     746 GIC         375 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     747                 :         /* And don't overflow MaxAllocSize, either */
     748             375 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     749                 : 
     750                 :         /* Buffers should be I/O aligned. */
     751 GNC         375 :         cur_block = (char *)
     752             375 :             TYPEALIGN(PG_IO_ALIGN_SIZE,
     753                 :                       MemoryContextAlloc(LocalBufferContext,
     754                 :                                          num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
     755 GIC         375 :         next_buf_in_block = 0;
     756             375 :         num_bufs_in_block = num_bufs;
     757 ECB             :     }
     758                 : 
     759                 :     /* Allocate next buffer in current memory block */
     760 GIC       14051 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     761 CBC       14051 :     next_buf_in_block++;
     762 GIC       14051 :     total_bufs_allocated++;
     763 ECB             : 
     764 GIC       14051 :     return (Block) this_buf;
     765                 : }
     766                 : 
     767 ECB             : /*
     768                 :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     769                 :  *
     770                 :  * This is just like CheckForBufferLeaks(), but for local buffers.
     771                 :  */
     772                 : static void
     773 GBC      499458 : CheckForLocalBufferLeaks(void)
     774                 : {
     775                 : #ifdef USE_ASSERT_CHECKING
     776 GIC      499458 :     if (LocalRefCount)
     777                 :     {
     778           46924 :         int         RefCountErrors = 0;
     779                 :         int         i;
     780                 : 
     781 CBC    47977904 :         for (i = 0; i < NLocBuffer; i++)
     782 ECB             :         {
     783 GBC    47930980 :             if (LocalRefCount[i] != 0)
     784                 :             {
     785 LBC           0 :                 Buffer      b = -i - 1;
     786 ECB             : 
     787 LBC           0 :                 PrintBufferLeakWarning(b);
     788               0 :                 RefCountErrors++;
     789                 :             }
     790                 :         }
     791 CBC       46924 :         Assert(RefCountErrors == 0);
     792                 :     }
     793                 : #endif
     794 GIC      499458 : }
     795                 : 
     796                 : /*
     797                 :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     798                 :  *
     799                 :  * This is just like AtEOXact_Buffers, but for local buffers.
     800 ECB             :  */
     801                 : void
     802 CBC      486167 : AtEOXact_LocalBuffers(bool isCommit)
     803                 : {
     804 GIC      486167 :     CheckForLocalBufferLeaks();
     805          486167 : }
     806                 : 
     807                 : /*
     808                 :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
     809                 :  *
     810                 :  * This is just like AtProcExit_Buffers, but for local buffers.
     811                 :  */
     812                 : void
     813           13291 : AtProcExit_LocalBuffers(void)
     814 ECB             : {
     815 EUB             :     /*
     816                 :      * We shouldn't be holding any remaining pins; if we are, and assertions
     817                 :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
     818                 :      * drop the temp rels.
     819                 :      */
     820 CBC       13291 :     CheckForLocalBufferLeaks();
     821           13291 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a