Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "catalog/catalog.h"
20 : #include "executor/instrument.h"
21 : #include "pgstat.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "utils/guc_hooks.h"
25 : #include "utils/memutils.h"
26 : #include "utils/resowner_private.h"
27 :
28 :
29 : /*#define LBDEBUG*/
30 :
31 : /* entry for buffer lookup hashtable */
32 : typedef struct
33 : {
34 : BufferTag key; /* Tag of a disk page */
35 : int id; /* Associated local buffer's index */
36 : } LocalBufferLookupEnt;
37 :
38 : /* Note: this macro only works on local buffers, not shared ones! */
39 : #define LocalBufHdrGetBlock(bufHdr) \
40 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
41 :
42 : int NLocBuffer = 0; /* until buffers are initialized */
43 :
44 : BufferDesc *LocalBufferDescriptors = NULL;
45 : Block *LocalBufferBlockPointers = NULL;
46 : int32 *LocalRefCount = NULL;
47 :
48 : static int nextFreeLocalBufId = 0;
49 :
50 : static HTAB *LocalBufHash = NULL;
51 :
52 : /* number of local buffers pinned at least once */
53 : static int NLocalPinnedBuffers = 0;
54 :
55 :
56 : static void InitLocalBuffers(void);
57 : static Block GetLocalBufferStorage(void);
58 : static Buffer GetLocalVictimBuffer(void);
59 :
60 :
61 : /*
62 : * PrefetchLocalBuffer -
63 : * initiate asynchronous read of a block of a relation
64 : *
65 : * Do PrefetchBuffer's work for temporary relations.
66 : * No-op if prefetching isn't compiled in.
67 : */
68 : PrefetchBufferResult
1096 tmunro 69 GIC 6244 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
70 : BlockNumber blockNum)
71 : {
72 6244 : PrefetchBufferResult result = {InvalidBuffer, false};
73 : BufferTag newTag; /* identity of requested block */
5200 tgl 74 ECB : LocalBufferLookupEnt *hresult;
75 :
256 rhaas 76 GNC 6244 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
5200 tgl 77 ECB :
78 : /* Initialize local buffers if first request in this session */
5200 tgl 79 GIC 6244 : if (LocalBufHash == NULL)
5200 tgl 80 UIC 0 : InitLocalBuffers();
5200 tgl 81 ECB :
82 : /* See if the desired buffer already exists */
83 : hresult = (LocalBufferLookupEnt *)
62 peter 84 GNC 6244 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
5200 tgl 85 EUB :
5200 tgl 86 GIC 6244 : if (hresult)
87 : {
88 : /* Yes, so nothing to do */
1096 tmunro 89 CBC 6244 : result.recent_buffer = -hresult->id - 1;
90 : }
1096 tmunro 91 ECB : else
92 : {
93 : #ifdef USE_PREFETCH
94 : /* Not in buffers, so initiate prefetch */
1 tmunro 95 UNC 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
96 0 : smgrprefetch(smgr, forkNum, blockNum))
97 : {
98 0 : result.initiated_io = true;
99 : }
100 : #endif /* USE_PREFETCH */
101 : }
102 :
1096 tmunro 103 GBC 6244 : return result;
5200 tgl 104 EUB : }
105 :
106 :
107 : /*
108 : * LocalBufferAlloc -
109 : * Find or create a local buffer for the given page of the given relation.
110 : *
6927 tgl 111 ECB : * API is similar to bufmgr.c's BufferAlloc, except that we do not need
112 : * to do any locking since this is all local. Also, IO_IN_PROGRESS
113 : * does not get set. Lastly, we support only default access strategy
114 : * (hence, usage_count is always advanced).
115 : */
116 : BufferDesc *
5354 heikki.linnakangas 117 GIC 1247393 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
118 : bool *foundPtr)
119 : {
120 : BufferTag newTag; /* identity of requested block */
121 : LocalBufferLookupEnt *hresult;
122 : BufferDesc *bufHdr;
123 : Buffer victim_buffer;
124 : int bufid;
6595 tgl 125 ECB : bool found;
126 :
256 rhaas 127 GNC 1247393 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
128 :
129 : /* Initialize local buffers if first request in this session */
6595 tgl 130 GIC 1247393 : if (LocalBufHash == NULL)
131 13 : InitLocalBuffers();
132 :
133 : /* See if the desired buffer already exists */
6595 tgl 134 ECB : hresult = (LocalBufferLookupEnt *)
62 peter 135 GNC 1247393 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
136 :
6595 tgl 137 CBC 1247393 : if (hresult)
9345 bruce 138 ECB : {
4 andres 139 GNC 1243603 : bufid = hresult->id;
140 1243603 : bufHdr = GetLocalBufferDescriptor(bufid);
256 rhaas 141 1243603 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
2555 andres 142 ECB :
4 andres 143 GNC 1243603 : *foundPtr = PinLocalBuffer(bufHdr, true);
144 : }
145 : else
146 : {
147 : uint32 buf_state;
148 :
149 3790 : victim_buffer = GetLocalVictimBuffer();
150 3790 : bufid = -victim_buffer - 1;
151 3790 : bufHdr = GetLocalBufferDescriptor(bufid);
152 :
153 : hresult = (LocalBufferLookupEnt *)
154 3790 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
155 3790 : if (found) /* shouldn't happen */
4 andres 156 UNC 0 : elog(ERROR, "local buffer hash table corrupted");
4 andres 157 GNC 3790 : hresult->id = bufid;
158 :
159 : /*
160 : * it's all ours now.
161 : */
162 3790 : bufHdr->tag = newTag;
163 :
164 3790 : buf_state = pg_atomic_read_u32(&bufHdr->state);
165 3790 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
166 3790 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
167 3790 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
168 :
169 3790 : *foundPtr = false;
170 : }
4 andres 171 ECB :
4 andres 172 GNC 1247393 : return bufHdr;
173 : }
174 :
175 : static Buffer
176 15296 : GetLocalVictimBuffer(void)
177 : {
178 : int victim_bufid;
179 : int trycounter;
180 : uint32 buf_state;
181 : BufferDesc *bufHdr;
182 :
183 15296 : ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
184 :
6610 tgl 185 ECB : /*
186 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
187 : * the same as what freelist.c does now...)
188 : */
6610 tgl 189 GIC 15296 : trycounter = NLocBuffer;
190 : for (;;)
191 : {
4 andres 192 GNC 18503 : victim_bufid = nextFreeLocalBufId;
193 :
194 18503 : if (++nextFreeLocalBufId >= NLocBuffer)
195 33 : nextFreeLocalBufId = 0;
196 :
197 18503 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
198 :
199 18503 : if (LocalRefCount[victim_bufid] == 0)
200 : {
2555 andres 201 GIC 18500 : buf_state = pg_atomic_read_u32(&bufHdr->state);
202 :
203 18500 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
204 : {
2555 andres 205 CBC 3204 : buf_state -= BUF_USAGECOUNT_ONE;
2375 andres 206 GIC 3204 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
5793 tgl 207 3204 : trycounter = NLocBuffer;
5793 tgl 208 ECB : }
209 : else
210 : {
211 : /* Found a usable buffer */
4 andres 212 GNC 15296 : PinLocalBuffer(bufHdr, false);
5793 tgl 213 CBC 15296 : break;
214 : }
6610 tgl 215 ECB : }
6610 tgl 216 GIC 3 : else if (--trycounter == 0)
6610 tgl 217 LBC 0 : ereport(ERROR,
218 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
6610 tgl 219 ECB : errmsg("no empty local buffer available")));
9770 scrappy 220 : }
221 :
222 : /*
223 : * lazy memory allocation: allocate space on first use of a buffer.
224 : */
4 andres 225 CBC 15296 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
226 : {
227 : /* Set pointer for use by BufferGetBlock() macro */
228 14051 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
229 : }
4 andres 230 ECB :
231 : /*
232 : * this buffer is not referenced but it might still be dirty. if that's
233 : * the case, write it out before reusing it!
234 : */
2555 andres 235 GNC 15296 : if (buf_state & BM_DIRTY)
236 : {
237 : instr_time io_start;
238 : SMgrRelation oreln;
3602 bruce 239 447 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
240 :
241 : /* Find smgr relation for buffer */
228 rhaas 242 447 : oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyBackendId);
243 :
3670 simon 244 447 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
245 :
2 andres 246 447 : io_start = pgstat_prepare_io_time();
247 :
248 : /* And write... */
6663 tgl 249 447 : smgrwrite(oreln,
228 rhaas 250 447 : BufTagGetForkNum(&bufHdr->tag),
251 : bufHdr->tag.blockNum,
252 : localpage,
253 : false);
254 :
255 : /* Temporary table I/O does not use Buffer Access Strategies */
2 andres 256 447 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
257 : IOOP_WRITE, io_start, 1);
258 :
259 : /* Mark not-dirty now in case we error out below */
2555 260 447 : buf_state &= ~BM_DIRTY;
2375 261 447 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
262 :
4863 rhaas 263 447 : pgBufferUsage.local_blks_written++;
264 : }
265 :
266 : /*
267 : * Remove the victim buffer from the hashtable and mark as invalid.
268 : */
2555 andres 269 GIC 15296 : if (buf_state & BM_TAG_VALID)
6595 tgl 270 ECB : {
271 : LocalBufferLookupEnt *hresult;
272 :
273 : hresult = (LocalBufferLookupEnt *)
62 peter 274 GNC 450 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
6385 bruce 275 GIC 450 : if (!hresult) /* shouldn't happen */
6595 tgl 276 UIC 0 : elog(ERROR, "local buffer hash table corrupted");
277 : /* mark buffer invalid just in case hash insert fails */
256 rhaas 278 GNC 450 : ClearBufferTag(&bufHdr->tag);
4 andres 279 450 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
2375 andres 280 GIC 450 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
59 andres 281 GNC 450 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
282 : }
6595 tgl 283 ECB :
4 andres 284 GNC 15296 : return BufferDescriptorGetBuffer(bufHdr);
285 : }
286 :
287 : /* see LimitAdditionalPins() */
288 : static void
289 10895 : LimitAdditionalLocalPins(uint32 *additional_pins)
290 : {
291 : uint32 max_pins;
292 :
293 10895 : if (*additional_pins <= 1)
294 10709 : return;
295 :
296 : /*
297 : * In contrast to LimitAdditionalPins() other backends don't play a role
298 : * here. We can allow up to NLocBuffer pins in total.
4 andres 299 ECB : */
4 andres 300 GNC 186 : max_pins = (NLocBuffer - NLocalPinnedBuffers);
4 andres 301 EUB :
4 andres 302 GNC 186 : if (*additional_pins >= max_pins)
4 andres 303 UNC 0 : *additional_pins = max_pins;
304 : }
305 :
306 : /*
307 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
308 : * temporary buffers.
309 : */
310 : BlockNumber
4 andres 311 GNC 10895 : ExtendBufferedRelLocal(ExtendBufferedWhat eb,
312 : ForkNumber fork,
313 : uint32 flags,
314 : uint32 extend_by,
315 : BlockNumber extend_upto,
316 : Buffer *buffers,
317 : uint32 *extended_by)
318 : {
319 : BlockNumber first_block;
320 : instr_time io_start;
321 :
322 : /* Initialize local buffers if first request in this session */
323 10895 : if (LocalBufHash == NULL)
324 224 : InitLocalBuffers();
325 :
326 10895 : LimitAdditionalLocalPins(&extend_by);
327 :
328 22401 : for (uint32 i = 0; i < extend_by; i++)
329 : {
330 : BufferDesc *buf_hdr;
331 : Block buf_block;
332 :
333 11506 : buffers[i] = GetLocalVictimBuffer();
334 11506 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
335 11506 : buf_block = LocalBufHdrGetBlock(buf_hdr);
336 :
337 : /* new buffers are zero-filled */
338 11506 : MemSet((char *) buf_block, 0, BLCKSZ);
339 : }
340 :
341 10895 : first_block = smgrnblocks(eb.smgr, fork);
342 :
343 10895 : if (extend_upto != InvalidBlockNumber)
344 : {
345 : /*
346 : * In contrast to shared relations, nothing could change the relation
347 : * size concurrently. Thus we shouldn't end up finding that we don't
348 : * need to do anything.
349 : */
350 128 : Assert(first_block <= extend_upto);
351 :
352 128 : Assert((uint64) first_block + extend_by <= extend_upto);
353 : }
354 :
355 : /* Fail if relation is already at maximum possible length */
356 10895 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
4 andres 357 UNC 0 : ereport(ERROR,
358 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : errmsg("cannot extend relation %s beyond %u blocks",
360 : relpath(eb.smgr->smgr_rlocator, fork),
361 : MaxBlockNumber)));
362 :
4 andres 363 GNC 22401 : for (int i = 0; i < extend_by; i++)
364 : {
365 : int victim_buf_id;
366 : BufferDesc *victim_buf_hdr;
367 : BufferTag tag;
368 : LocalBufferLookupEnt *hresult;
369 : bool found;
370 :
371 11506 : victim_buf_id = -buffers[i] - 1;
372 11506 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
373 :
374 11506 : InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
375 :
376 : hresult = (LocalBufferLookupEnt *)
377 11506 : hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found);
378 11506 : if (found)
379 : {
4 andres 380 UNC 0 : BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id);
381 : uint32 buf_state;
382 :
383 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
384 :
385 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
386 0 : PinLocalBuffer(existing_hdr, false);
387 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
388 :
389 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
390 0 : Assert(buf_state & BM_TAG_VALID);
391 0 : Assert(!(buf_state & BM_DIRTY));
392 0 : buf_state &= BM_VALID;
393 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
394 : }
395 : else
396 : {
4 andres 397 GNC 11506 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
398 :
399 11506 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
400 :
401 11506 : victim_buf_hdr->tag = tag;
402 :
403 11506 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
404 :
405 11506 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
406 :
407 11506 : hresult->id = victim_buf_id;
408 : }
409 : }
410 :
2 411 10895 : io_start = pgstat_prepare_io_time();
412 :
413 : /* actually extend relation */
4 414 10895 : smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
415 :
2 416 10895 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
417 : io_start, extend_by);
418 :
4 419 22401 : for (int i = 0; i < extend_by; i++)
420 : {
421 11506 : Buffer buf = buffers[i];
422 : BufferDesc *buf_hdr;
423 : uint32 buf_state;
424 :
425 11506 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
426 :
427 11506 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
428 11506 : buf_state |= BM_VALID;
429 11506 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
430 : }
431 :
432 10895 : *extended_by = extend_by;
433 :
434 10895 : pgBufferUsage.temp_blks_written += extend_by;
435 :
436 10895 : return first_block;
4 andres 437 ECB : }
438 :
9770 scrappy 439 : /*
440 : * MarkLocalBufferDirty -
441 : * mark a local buffer dirty
442 : */
443 : void
6218 tgl 444 GIC 1817055 : MarkLocalBufferDirty(Buffer buffer)
445 : {
446 : int bufid;
6610 tgl 447 ECB : BufferDesc *bufHdr;
448 : uint32 buf_state;
449 :
9345 bruce 450 GIC 1817055 : Assert(BufferIsLocal(buffer));
9770 scrappy 451 ECB :
452 : #ifdef LBDEBUG
453 : fprintf(stderr, "LB DIRTY %d\n", buffer);
454 : #endif
455 :
10 andres 456 GNC 1817055 : bufid = -buffer - 1;
457 :
6610 tgl 458 CBC 1817055 : Assert(LocalRefCount[bufid] > 0);
459 :
2992 andres 460 1817055 : bufHdr = GetLocalBufferDescriptor(bufid);
4064 rhaas 461 EUB :
2552 andres 462 GIC 1817055 : buf_state = pg_atomic_read_u32(&bufHdr->state);
463 :
2555 464 1817055 : if (!(buf_state & BM_DIRTY))
465 11764 : pgBufferUsage.local_blks_dirtied++;
466 :
2552 467 1817055 : buf_state |= BM_DIRTY;
468 :
2375 andres 469 CBC 1817055 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
9770 scrappy 470 GIC 1817055 : }
471 :
472 : /*
473 : * DropRelationLocalBuffers
474 : * This function removes from the buffer pool all the pages of the
475 : * specified relation that have block numbers >= firstDelBlock.
476 : * (In particular, with firstDelBlock = 0, all pages are removed.)
477 : * Dirty pages are simply dropped, without bothering to write them
478 : * out first. Therefore, this is NOT rollback-able, and so should be
479 : * used only with extreme caution!
480 : *
481 : * See DropRelationBuffers in bufmgr.c for more notes.
6352 tgl 482 ECB : */
483 : void
271 rhaas 484 GNC 346 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
485 : BlockNumber firstDelBlock)
6352 tgl 486 ECB : {
487 : int i;
488 :
6352 tgl 489 GIC 322906 : for (i = 0; i < NLocBuffer; i++)
490 : {
2992 andres 491 CBC 322560 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
6352 tgl 492 ECB : LocalBufferLookupEnt *hresult;
2555 andres 493 : uint32 buf_state;
494 :
2555 andres 495 GIC 322560 : buf_state = pg_atomic_read_u32(&bufHdr->state);
2555 andres 496 ECB :
2555 andres 497 GIC 349130 : if ((buf_state & BM_TAG_VALID) &&
228 rhaas 498 GNC 27455 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
499 885 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
6352 tgl 500 GIC 818 : bufHdr->tag.blockNum >= firstDelBlock)
6352 tgl 501 ECB : {
6352 tgl 502 GIC 792 : if (LocalRefCount[i] != 0)
5262 heikki.linnakangas 503 UIC 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
504 : bufHdr->tag.blockNum,
505 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
506 : MyBackendId,
507 : BufTagGetForkNum(&bufHdr->tag)),
508 : LocalRefCount[i]);
509 :
6352 tgl 510 ECB : /* Remove entry from hashtable */
511 : hresult = (LocalBufferLookupEnt *)
62 peter 512 GNC 792 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
6347 bruce 513 GIC 792 : if (!hresult) /* shouldn't happen */
6352 tgl 514 UIC 0 : elog(ERROR, "local buffer hash table corrupted");
6352 tgl 515 ECB : /* Mark buffer invalid */
256 rhaas 516 GNC 792 : ClearBufferTag(&bufHdr->tag);
2555 andres 517 GIC 792 : buf_state &= ~BUF_FLAG_MASK;
518 792 : buf_state &= ~BUF_USAGECOUNT_MASK;
2375 519 792 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
520 : }
521 : }
6352 tgl 522 CBC 346 : }
523 :
524 : /*
525 : * DropRelationAllLocalBuffers
526 : * This function removes from the buffer pool all pages of all forks
527 : * of the specified relation.
528 : *
529 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
3958 tgl 530 ECB : */
531 : void
271 rhaas 532 GNC 2808 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
3958 tgl 533 ECB : {
534 : int i;
535 :
3958 tgl 536 CBC 2719232 : for (i = 0; i < NLocBuffer; i++)
3958 tgl 537 ECB : {
2992 andres 538 GIC 2716424 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
3958 tgl 539 EUB : LocalBufferLookupEnt *hresult;
540 : uint32 buf_state;
541 :
2555 andres 542 GBC 2716424 : buf_state = pg_atomic_read_u32(&bufHdr->state);
543 :
544 2888796 : if ((buf_state & BM_TAG_VALID) &&
228 rhaas 545 GNC 172372 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
3958 tgl 546 EUB : {
3958 tgl 547 GIC 14054 : if (LocalRefCount[i] != 0)
3958 tgl 548 UBC 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
3958 tgl 549 EUB : bufHdr->tag.blockNum,
550 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
551 : MyBackendId,
552 : BufTagGetForkNum(&bufHdr->tag)),
553 : LocalRefCount[i]);
554 : /* Remove entry from hashtable */
555 : hresult = (LocalBufferLookupEnt *)
62 peter 556 GNC 14054 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
3958 tgl 557 GIC 14054 : if (!hresult) /* shouldn't happen */
3958 tgl 558 LBC 0 : elog(ERROR, "local buffer hash table corrupted");
559 : /* Mark buffer invalid */
256 rhaas 560 GNC 14054 : ClearBufferTag(&bufHdr->tag);
2555 andres 561 GIC 14054 : buf_state &= ~BUF_FLAG_MASK;
2555 andres 562 CBC 14054 : buf_state &= ~BUF_USAGECOUNT_MASK;
2375 andres 563 GIC 14054 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
3958 tgl 564 ECB : }
565 : }
3958 tgl 566 CBC 2808 : }
567 :
568 : /*
569 : * InitLocalBuffers -
9345 bruce 570 ECB : * init the local buffer cache. Since most queries (esp. multi-user ones)
571 : * don't involve local buffers, we delay allocating actual memory for the
572 : * buffers until we need them; just make the buffer headers here.
9770 scrappy 573 : */
574 : static void
6595 tgl 575 CBC 237 : InitLocalBuffers(void)
576 : {
6595 tgl 577 GIC 237 : int nbufs = num_temp_buffers;
6595 tgl 578 ECB : HASHCTL info;
579 : int i;
9770 scrappy 580 :
581 : /*
582 : * Parallel workers can't access data in temporary tables, because they
583 : * have no visibility into the local buffers of their leader. This is a
2495 tgl 584 : * convenient, low-cost place to provide a backstop check for that. Note
585 : * that we don't wish to prevent a parallel worker from accessing catalog
586 : * metadata about a temp table, so checks at higher levels would be
587 : * inappropriate.
588 : */
2495 tgl 589 GIC 237 : if (IsParallelWorker())
2495 tgl 590 UIC 0 : ereport(ERROR,
2495 tgl 591 ECB : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
592 : errmsg("cannot access temporary tables during a parallel operation")));
593 :
594 : /* Allocate and zero buffer headers and auxiliary arrays */
6441 tgl 595 CBC 237 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
6441 tgl 596 GIC 237 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
597 237 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
598 237 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
6441 tgl 599 UIC 0 : ereport(FATAL,
600 : (errcode(ERRCODE_OUT_OF_MEMORY),
601 : errmsg("out of memory")));
602 :
4 andres 603 GNC 237 : nextFreeLocalBufId = 0;
604 :
605 : /* initialize fields that need to start off nonzero */
6595 tgl 606 GIC 240153 : for (i = 0; i < nbufs; i++)
607 : {
2992 andres 608 239916 : BufferDesc *buf = GetLocalBufferDescriptor(i);
9345 bruce 609 ECB :
610 : /*
611 : * negative to indicate local buffer. This is tricky: shared buffers
612 : * start with 0. We have to start with -2. (Note that the routine
613 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
614 : * is -1.)
615 : */
9345 bruce 616 GIC 239916 : buf->buf_id = -i - 2;
2552 andres 617 ECB :
618 : /*
619 : * Intentionally do not initialize the buffer's atomic variable
620 : * (besides zeroing the underlying memory above). That way we get
621 : * errors on platforms without atomics, if somebody (re-)introduces
622 : * atomic operations for local buffers.
623 : */
9345 bruce 624 : }
625 :
6595 tgl 626 : /* Create the lookup hash table */
6595 tgl 627 GIC 237 : info.keysize = sizeof(BufferTag);
6595 tgl 628 CBC 237 : info.entrysize = sizeof(LocalBufferLookupEnt);
6595 tgl 629 ECB :
6595 tgl 630 GIC 237 : LocalBufHash = hash_create("Local Buffer Lookup Table",
631 : nbufs,
632 : &info,
633 : HASH_ELEM | HASH_BLOBS);
634 :
635 237 : if (!LocalBufHash)
6595 tgl 636 UIC 0 : elog(ERROR, "could not initialize local buffer hash table");
637 :
638 : /* Initialization done, mark buffers allocated */
6595 tgl 639 GIC 237 : NLocBuffer = nbufs;
9770 scrappy 640 237 : }
641 :
642 : /*
643 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
644 : * that does not support adjusting the usagecount - but so far it does not
645 : * seem worth the trouble.
646 : */
647 : bool
4 andres 648 GNC 1258899 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
649 : {
650 : uint32 buf_state;
651 1258899 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
652 1258899 : int bufid = -buffer - 1;
653 :
654 1258899 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
655 :
656 1258899 : if (LocalRefCount[bufid] == 0)
657 : {
658 1174411 : NLocalPinnedBuffers++;
659 1174411 : if (adjust_usagecount &&
660 1159115 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
661 : {
662 51077 : buf_state += BUF_USAGECOUNT_ONE;
663 51077 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
664 : }
665 : }
666 1258899 : LocalRefCount[bufid]++;
667 1258899 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
668 : BufferDescriptorGetBuffer(buf_hdr));
669 :
670 1258899 : return buf_state & BM_VALID;
671 : }
672 :
673 : void
674 1605766 : UnpinLocalBuffer(Buffer buffer)
675 : {
676 1605766 : int buffid = -buffer - 1;
677 :
678 1605766 : Assert(BufferIsLocal(buffer));
679 1605766 : Assert(LocalRefCount[buffid] > 0);
680 1605766 : Assert(NLocalPinnedBuffers > 0);
681 :
682 1605766 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
683 1605766 : if (--LocalRefCount[buffid] == 0)
684 1174411 : NLocalPinnedBuffers--;
685 1605766 : }
686 :
687 : /*
688 : * GUC check_hook for temp_buffers
689 : */
690 : bool
208 tgl 691 1861 : check_temp_buffers(int *newval, void **extra, GucSource source)
692 : {
693 : /*
694 : * Once local buffers have been initialized, it's too late to change this.
695 : * However, if this is only a test call, allow it.
696 : */
697 1861 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
698 : {
208 tgl 699 UNC 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
700 0 : return false;
701 : }
208 tgl 702 GNC 1861 : return true;
703 : }
704 :
705 : /*
5947 tgl 706 ECB : * GetLocalBufferStorage - allocate memory for a local buffer
707 : *
708 : * The idea of this function is to aggregate our requests for storage
709 : * so that the memory manager doesn't see a whole lot of relatively small
710 : * requests. Since we'll never give back a local buffer once it's created
711 : * within a particular process, no point in burdening memmgr with separately
712 : * managed chunks.
713 : */
714 : static Block
5947 tgl 715 GIC 14051 : GetLocalBufferStorage(void)
716 : {
5947 tgl 717 ECB : static char *cur_block = NULL;
718 : static int next_buf_in_block = 0;
719 : static int num_bufs_in_block = 0;
720 : static int total_bufs_allocated = 0;
4616 721 : static MemoryContext LocalBufferContext = NULL;
5947 722 :
723 : char *this_buf;
724 :
5947 tgl 725 GBC 14051 : Assert(total_bufs_allocated < NLocBuffer);
726 :
5947 tgl 727 GIC 14051 : if (next_buf_in_block >= num_bufs_in_block)
728 : {
729 : /* Need to make a new request to memmgr */
730 : int num_bufs;
731 :
732 : /*
733 : * We allocate local buffers in a context of their own, so that the
4616 tgl 734 ECB : * space eaten for them is easily recognizable in MemoryContextStats
3260 bruce 735 : * output. Create the context on first use.
4616 tgl 736 EUB : */
4616 tgl 737 GIC 375 : if (LocalBufferContext == NULL)
4616 tgl 738 CBC 237 : LocalBufferContext =
739 237 : AllocSetContextCreate(TopMemoryContext,
4616 tgl 740 ECB : "LocalBufferContext",
2416 741 : ALLOCSET_DEFAULT_SIZES);
742 :
743 : /* Start with a 16-buffer request; subsequent ones double each time */
5947 tgl 744 CBC 375 : num_bufs = Max(num_bufs_in_block * 2, 16);
745 : /* But not more than what we need for all remaining local bufs */
5947 tgl 746 GIC 375 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
747 : /* And don't overflow MaxAllocSize, either */
748 375 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
749 :
750 : /* Buffers should be I/O aligned. */
1 tmunro 751 GNC 375 : cur_block = (char *)
752 375 : TYPEALIGN(PG_IO_ALIGN_SIZE,
753 : MemoryContextAlloc(LocalBufferContext,
754 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
5947 tgl 755 GIC 375 : next_buf_in_block = 0;
756 375 : num_bufs_in_block = num_bufs;
5947 tgl 757 ECB : }
758 :
759 : /* Allocate next buffer in current memory block */
5947 tgl 760 GIC 14051 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
5947 tgl 761 CBC 14051 : next_buf_in_block++;
5947 tgl 762 GIC 14051 : total_bufs_allocated++;
5947 tgl 763 ECB :
5947 tgl 764 GIC 14051 : return (Block) this_buf;
765 : }
766 :
9770 scrappy 767 ECB : /*
768 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
8165 vadim4o 769 : *
1378 michael 770 : * This is just like CheckForBufferLeaks(), but for local buffers.
771 : */
3215 andres 772 : static void
3215 andres 773 GBC 499458 : CheckForLocalBufferLeaks(void)
774 : {
775 : #ifdef USE_ASSERT_CHECKING
3215 andres 776 GIC 499458 : if (LocalRefCount)
777 : {
3677 tgl 778 46924 : int RefCountErrors = 0;
779 : int i;
780 :
6453 tgl 781 CBC 47977904 : for (i = 0; i < NLocBuffer; i++)
6453 tgl 782 ECB : {
3677 tgl 783 GBC 47930980 : if (LocalRefCount[i] != 0)
784 : {
3602 bruce 785 LBC 0 : Buffer b = -i - 1;
3677 tgl 786 ECB :
3677 tgl 787 LBC 0 : PrintBufferLeakWarning(b);
788 0 : RefCountErrors++;
789 : }
790 : }
3677 tgl 791 CBC 46924 : Assert(RefCountErrors == 0);
792 : }
793 : #endif
9770 scrappy 794 GIC 499458 : }
795 :
796 : /*
797 : * AtEOXact_LocalBuffers - clean up at end of transaction.
798 : *
799 : * This is just like AtEOXact_Buffers, but for local buffers.
3215 andres 800 ECB : */
801 : void
3215 andres 802 CBC 486167 : AtEOXact_LocalBuffers(bool isCommit)
803 : {
3215 andres 804 GIC 486167 : CheckForLocalBufferLeaks();
805 486167 : }
806 :
807 : /*
808 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
809 : *
810 : * This is just like AtProcExit_Buffers, but for local buffers.
811 : */
812 : void
6596 tgl 813 13291 : AtProcExit_LocalBuffers(void)
6596 tgl 814 ECB : {
3215 andres 815 EUB : /*
816 : * We shouldn't be holding any remaining pins; if we are, and assertions
817 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
818 : * drop the temp rels.
819 : */
3215 andres 820 CBC 13291 : CheckForLocalBufferLeaks();
6596 tgl 821 13291 : }
|