TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "catalog/catalog.h"
20 : #include "executor/instrument.h"
21 : #include "pgstat.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "utils/guc_hooks.h"
25 : #include "utils/memutils.h"
26 : #include "utils/resowner_private.h"
27 :
28 :
29 : /*#define LBDEBUG*/
30 :
31 : /* entry for buffer lookup hashtable */
32 : typedef struct
33 : {
34 : BufferTag key; /* Tag of a disk page */
35 : int id; /* Associated local buffer's index */
36 : } LocalBufferLookupEnt;
37 :
38 : /* Note: this macro only works on local buffers, not shared ones! */
39 : #define LocalBufHdrGetBlock(bufHdr) \
40 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
41 :
42 : int NLocBuffer = 0; /* until buffers are initialized */
43 :
44 : BufferDesc *LocalBufferDescriptors = NULL;
45 : Block *LocalBufferBlockPointers = NULL;
46 : int32 *LocalRefCount = NULL;
47 :
48 : static int nextFreeLocalBufId = 0;
49 :
50 : static HTAB *LocalBufHash = NULL;
51 :
52 : /* number of local buffers pinned at least once */
53 : static int NLocalPinnedBuffers = 0;
54 :
55 :
56 : static void InitLocalBuffers(void);
57 : static Block GetLocalBufferStorage(void);
58 : static Buffer GetLocalVictimBuffer(void);
59 :
60 :
61 : /*
62 : * PrefetchLocalBuffer -
63 : * initiate asynchronous read of a block of a relation
64 : *
65 : * Do PrefetchBuffer's work for temporary relations.
66 : * No-op if prefetching isn't compiled in.
67 : */
68 : PrefetchBufferResult
69 GIC 6244 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
70 : BlockNumber blockNum)
71 : {
72 6244 : PrefetchBufferResult result = {InvalidBuffer, false};
73 : BufferTag newTag; /* identity of requested block */
74 ECB : LocalBufferLookupEnt *hresult;
75 :
76 GNC 6244 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
77 ECB :
78 : /* Initialize local buffers if first request in this session */
79 GIC 6244 : if (LocalBufHash == NULL)
80 UIC 0 : InitLocalBuffers();
81 ECB :
82 : /* See if the desired buffer already exists */
83 : hresult = (LocalBufferLookupEnt *)
84 GNC 6244 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
85 EUB :
86 GIC 6244 : if (hresult)
87 : {
88 : /* Yes, so nothing to do */
89 CBC 6244 : result.recent_buffer = -hresult->id - 1;
90 : }
91 ECB : else
92 : {
93 : #ifdef USE_PREFETCH
94 : /* Not in buffers, so initiate prefetch */
95 UNC 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
96 0 : smgrprefetch(smgr, forkNum, blockNum))
97 : {
98 0 : result.initiated_io = true;
99 : }
100 : #endif /* USE_PREFETCH */
101 : }
102 :
103 GBC 6244 : return result;
104 EUB : }
105 :
106 :
107 : /*
108 : * LocalBufferAlloc -
109 : * Find or create a local buffer for the given page of the given relation.
110 : *
111 ECB : * API is similar to bufmgr.c's BufferAlloc, except that we do not need
112 : * to do any locking since this is all local. Also, IO_IN_PROGRESS
113 : * does not get set. Lastly, we support only default access strategy
114 : * (hence, usage_count is always advanced).
115 : */
116 : BufferDesc *
117 GIC 1247393 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
118 : bool *foundPtr)
119 : {
120 : BufferTag newTag; /* identity of requested block */
121 : LocalBufferLookupEnt *hresult;
122 : BufferDesc *bufHdr;
123 : Buffer victim_buffer;
124 : int bufid;
125 ECB : bool found;
126 :
127 GNC 1247393 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
128 :
129 : /* Initialize local buffers if first request in this session */
130 GIC 1247393 : if (LocalBufHash == NULL)
131 13 : InitLocalBuffers();
132 :
133 : /* See if the desired buffer already exists */
134 ECB : hresult = (LocalBufferLookupEnt *)
135 GNC 1247393 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
136 :
137 CBC 1247393 : if (hresult)
138 ECB : {
139 GNC 1243603 : bufid = hresult->id;
140 1243603 : bufHdr = GetLocalBufferDescriptor(bufid);
141 1243603 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
142 ECB :
143 GNC 1243603 : *foundPtr = PinLocalBuffer(bufHdr, true);
144 : }
145 : else
146 : {
147 : uint32 buf_state;
148 :
149 3790 : victim_buffer = GetLocalVictimBuffer();
150 3790 : bufid = -victim_buffer - 1;
151 3790 : bufHdr = GetLocalBufferDescriptor(bufid);
152 :
153 : hresult = (LocalBufferLookupEnt *)
154 3790 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
155 3790 : if (found) /* shouldn't happen */
156 UNC 0 : elog(ERROR, "local buffer hash table corrupted");
157 GNC 3790 : hresult->id = bufid;
158 :
159 : /*
160 : * it's all ours now.
161 : */
162 3790 : bufHdr->tag = newTag;
163 :
164 3790 : buf_state = pg_atomic_read_u32(&bufHdr->state);
165 3790 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
166 3790 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
167 3790 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
168 :
169 3790 : *foundPtr = false;
170 : }
171 ECB :
172 GNC 1247393 : return bufHdr;
173 : }
174 :
175 : static Buffer
176 15296 : GetLocalVictimBuffer(void)
177 : {
178 : int victim_bufid;
179 : int trycounter;
180 : uint32 buf_state;
181 : BufferDesc *bufHdr;
182 :
183 15296 : ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
184 :
185 ECB : /*
186 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
187 : * the same as what freelist.c does now...)
188 : */
189 GIC 15296 : trycounter = NLocBuffer;
190 : for (;;)
191 : {
192 GNC 18503 : victim_bufid = nextFreeLocalBufId;
193 :
194 18503 : if (++nextFreeLocalBufId >= NLocBuffer)
195 33 : nextFreeLocalBufId = 0;
196 :
197 18503 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
198 :
199 18503 : if (LocalRefCount[victim_bufid] == 0)
200 : {
201 GIC 18500 : buf_state = pg_atomic_read_u32(&bufHdr->state);
202 :
203 18500 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
204 : {
205 CBC 3204 : buf_state -= BUF_USAGECOUNT_ONE;
206 GIC 3204 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
207 3204 : trycounter = NLocBuffer;
208 ECB : }
209 : else
210 : {
211 : /* Found a usable buffer */
212 GNC 15296 : PinLocalBuffer(bufHdr, false);
213 CBC 15296 : break;
214 : }
215 ECB : }
216 GIC 3 : else if (--trycounter == 0)
217 LBC 0 : ereport(ERROR,
218 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
219 ECB : errmsg("no empty local buffer available")));
220 : }
221 :
222 : /*
223 : * lazy memory allocation: allocate space on first use of a buffer.
224 : */
225 CBC 15296 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
226 : {
227 : /* Set pointer for use by BufferGetBlock() macro */
228 14051 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
229 : }
230 ECB :
231 : /*
232 : * this buffer is not referenced but it might still be dirty. if that's
233 : * the case, write it out before reusing it!
234 : */
235 GNC 15296 : if (buf_state & BM_DIRTY)
236 : {
237 : instr_time io_start;
238 : SMgrRelation oreln;
239 447 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
240 :
241 : /* Find smgr relation for buffer */
242 447 : oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyBackendId);
243 :
244 447 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
245 :
246 447 : io_start = pgstat_prepare_io_time();
247 :
248 : /* And write... */
249 447 : smgrwrite(oreln,
250 447 : BufTagGetForkNum(&bufHdr->tag),
251 : bufHdr->tag.blockNum,
252 : localpage,
253 : false);
254 :
255 : /* Temporary table I/O does not use Buffer Access Strategies */
256 447 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
257 : IOOP_WRITE, io_start, 1);
258 :
259 : /* Mark not-dirty now in case we error out below */
260 447 : buf_state &= ~BM_DIRTY;
261 447 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
262 :
263 447 : pgBufferUsage.local_blks_written++;
264 : }
265 :
266 : /*
267 : * Remove the victim buffer from the hashtable and mark as invalid.
268 : */
269 GIC 15296 : if (buf_state & BM_TAG_VALID)
270 ECB : {
271 : LocalBufferLookupEnt *hresult;
272 :
273 : hresult = (LocalBufferLookupEnt *)
274 GNC 450 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
275 GIC 450 : if (!hresult) /* shouldn't happen */
276 UIC 0 : elog(ERROR, "local buffer hash table corrupted");
277 : /* mark buffer invalid just in case hash insert fails */
278 GNC 450 : ClearBufferTag(&bufHdr->tag);
279 450 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
280 GIC 450 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
281 GNC 450 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
282 : }
283 ECB :
284 GNC 15296 : return BufferDescriptorGetBuffer(bufHdr);
285 : }
286 :
287 : /* see LimitAdditionalPins() */
288 : static void
289 10895 : LimitAdditionalLocalPins(uint32 *additional_pins)
290 : {
291 : uint32 max_pins;
292 :
293 10895 : if (*additional_pins <= 1)
294 10709 : return;
295 :
296 : /*
297 : * In contrast to LimitAdditionalPins() other backends don't play a role
298 : * here. We can allow up to NLocBuffer pins in total.
299 ECB : */
300 GNC 186 : max_pins = (NLocBuffer - NLocalPinnedBuffers);
301 EUB :
302 GNC 186 : if (*additional_pins >= max_pins)
303 UNC 0 : *additional_pins = max_pins;
304 : }
305 :
306 : /*
307 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
308 : * temporary buffers.
309 : */
310 : BlockNumber
311 GNC 10895 : ExtendBufferedRelLocal(ExtendBufferedWhat eb,
312 : ForkNumber fork,
313 : uint32 flags,
314 : uint32 extend_by,
315 : BlockNumber extend_upto,
316 : Buffer *buffers,
317 : uint32 *extended_by)
318 : {
319 : BlockNumber first_block;
320 : instr_time io_start;
321 :
322 : /* Initialize local buffers if first request in this session */
323 10895 : if (LocalBufHash == NULL)
324 224 : InitLocalBuffers();
325 :
326 10895 : LimitAdditionalLocalPins(&extend_by);
327 :
328 22401 : for (uint32 i = 0; i < extend_by; i++)
329 : {
330 : BufferDesc *buf_hdr;
331 : Block buf_block;
332 :
333 11506 : buffers[i] = GetLocalVictimBuffer();
334 11506 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
335 11506 : buf_block = LocalBufHdrGetBlock(buf_hdr);
336 :
337 : /* new buffers are zero-filled */
338 11506 : MemSet((char *) buf_block, 0, BLCKSZ);
339 : }
340 :
341 10895 : first_block = smgrnblocks(eb.smgr, fork);
342 :
343 10895 : if (extend_upto != InvalidBlockNumber)
344 : {
345 : /*
346 : * In contrast to shared relations, nothing could change the relation
347 : * size concurrently. Thus we shouldn't end up finding that we don't
348 : * need to do anything.
349 : */
350 128 : Assert(first_block <= extend_upto);
351 :
352 128 : Assert((uint64) first_block + extend_by <= extend_upto);
353 : }
354 :
355 : /* Fail if relation is already at maximum possible length */
356 10895 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
357 UNC 0 : ereport(ERROR,
358 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : errmsg("cannot extend relation %s beyond %u blocks",
360 : relpath(eb.smgr->smgr_rlocator, fork),
361 : MaxBlockNumber)));
362 :
363 GNC 22401 : for (int i = 0; i < extend_by; i++)
364 : {
365 : int victim_buf_id;
366 : BufferDesc *victim_buf_hdr;
367 : BufferTag tag;
368 : LocalBufferLookupEnt *hresult;
369 : bool found;
370 :
371 11506 : victim_buf_id = -buffers[i] - 1;
372 11506 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
373 :
374 11506 : InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
375 :
376 : hresult = (LocalBufferLookupEnt *)
377 11506 : hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found);
378 11506 : if (found)
379 : {
380 UNC 0 : BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id);
381 : uint32 buf_state;
382 :
383 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
384 :
385 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
386 0 : PinLocalBuffer(existing_hdr, false);
387 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
388 :
389 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
390 0 : Assert(buf_state & BM_TAG_VALID);
391 0 : Assert(!(buf_state & BM_DIRTY));
392 0 : buf_state &= BM_VALID;
393 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
394 : }
395 : else
396 : {
397 GNC 11506 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
398 :
399 11506 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
400 :
401 11506 : victim_buf_hdr->tag = tag;
402 :
403 11506 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
404 :
405 11506 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
406 :
407 11506 : hresult->id = victim_buf_id;
408 : }
409 : }
410 :
411 10895 : io_start = pgstat_prepare_io_time();
412 :
413 : /* actually extend relation */
414 10895 : smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
415 :
416 10895 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
417 : io_start, extend_by);
418 :
419 22401 : for (int i = 0; i < extend_by; i++)
420 : {
421 11506 : Buffer buf = buffers[i];
422 : BufferDesc *buf_hdr;
423 : uint32 buf_state;
424 :
425 11506 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
426 :
427 11506 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
428 11506 : buf_state |= BM_VALID;
429 11506 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
430 : }
431 :
432 10895 : *extended_by = extend_by;
433 :
434 10895 : pgBufferUsage.temp_blks_written += extend_by;
435 :
436 10895 : return first_block;
437 ECB : }
438 :
439 : /*
440 : * MarkLocalBufferDirty -
441 : * mark a local buffer dirty
442 : */
443 : void
444 GIC 1817055 : MarkLocalBufferDirty(Buffer buffer)
445 : {
446 : int bufid;
447 ECB : BufferDesc *bufHdr;
448 : uint32 buf_state;
449 :
450 GIC 1817055 : Assert(BufferIsLocal(buffer));
451 ECB :
452 : #ifdef LBDEBUG
453 : fprintf(stderr, "LB DIRTY %d\n", buffer);
454 : #endif
455 :
456 GNC 1817055 : bufid = -buffer - 1;
457 :
458 CBC 1817055 : Assert(LocalRefCount[bufid] > 0);
459 :
460 1817055 : bufHdr = GetLocalBufferDescriptor(bufid);
461 EUB :
462 GIC 1817055 : buf_state = pg_atomic_read_u32(&bufHdr->state);
463 :
464 1817055 : if (!(buf_state & BM_DIRTY))
465 11764 : pgBufferUsage.local_blks_dirtied++;
466 :
467 1817055 : buf_state |= BM_DIRTY;
468 :
469 CBC 1817055 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
470 GIC 1817055 : }
471 :
472 : /*
473 : * DropRelationLocalBuffers
474 : * This function removes from the buffer pool all the pages of the
475 : * specified relation that have block numbers >= firstDelBlock.
476 : * (In particular, with firstDelBlock = 0, all pages are removed.)
477 : * Dirty pages are simply dropped, without bothering to write them
478 : * out first. Therefore, this is NOT rollback-able, and so should be
479 : * used only with extreme caution!
480 : *
481 : * See DropRelationBuffers in bufmgr.c for more notes.
482 ECB : */
483 : void
484 GNC 346 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
485 : BlockNumber firstDelBlock)
486 ECB : {
487 : int i;
488 :
489 GIC 322906 : for (i = 0; i < NLocBuffer; i++)
490 : {
491 CBC 322560 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
492 ECB : LocalBufferLookupEnt *hresult;
493 : uint32 buf_state;
494 :
495 GIC 322560 : buf_state = pg_atomic_read_u32(&bufHdr->state);
496 ECB :
497 GIC 349130 : if ((buf_state & BM_TAG_VALID) &&
498 GNC 27455 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
499 885 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
500 GIC 818 : bufHdr->tag.blockNum >= firstDelBlock)
501 ECB : {
502 GIC 792 : if (LocalRefCount[i] != 0)
503 UIC 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
504 : bufHdr->tag.blockNum,
505 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
506 : MyBackendId,
507 : BufTagGetForkNum(&bufHdr->tag)),
508 : LocalRefCount[i]);
509 :
510 ECB : /* Remove entry from hashtable */
511 : hresult = (LocalBufferLookupEnt *)
512 GNC 792 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
513 GIC 792 : if (!hresult) /* shouldn't happen */
514 UIC 0 : elog(ERROR, "local buffer hash table corrupted");
515 ECB : /* Mark buffer invalid */
516 GNC 792 : ClearBufferTag(&bufHdr->tag);
517 GIC 792 : buf_state &= ~BUF_FLAG_MASK;
518 792 : buf_state &= ~BUF_USAGECOUNT_MASK;
519 792 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
520 : }
521 : }
522 CBC 346 : }
523 :
524 : /*
525 : * DropRelationAllLocalBuffers
526 : * This function removes from the buffer pool all pages of all forks
527 : * of the specified relation.
528 : *
529 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
530 ECB : */
531 : void
532 GNC 2808 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
533 ECB : {
534 : int i;
535 :
536 CBC 2719232 : for (i = 0; i < NLocBuffer; i++)
537 ECB : {
538 GIC 2716424 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
539 EUB : LocalBufferLookupEnt *hresult;
540 : uint32 buf_state;
541 :
542 GBC 2716424 : buf_state = pg_atomic_read_u32(&bufHdr->state);
543 :
544 2888796 : if ((buf_state & BM_TAG_VALID) &&
545 GNC 172372 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
546 EUB : {
547 GIC 14054 : if (LocalRefCount[i] != 0)
548 UBC 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
549 EUB : bufHdr->tag.blockNum,
550 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
551 : MyBackendId,
552 : BufTagGetForkNum(&bufHdr->tag)),
553 : LocalRefCount[i]);
554 : /* Remove entry from hashtable */
555 : hresult = (LocalBufferLookupEnt *)
556 GNC 14054 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
557 GIC 14054 : if (!hresult) /* shouldn't happen */
558 LBC 0 : elog(ERROR, "local buffer hash table corrupted");
559 : /* Mark buffer invalid */
560 GNC 14054 : ClearBufferTag(&bufHdr->tag);
561 GIC 14054 : buf_state &= ~BUF_FLAG_MASK;
562 CBC 14054 : buf_state &= ~BUF_USAGECOUNT_MASK;
563 GIC 14054 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
564 ECB : }
565 : }
566 CBC 2808 : }
567 :
568 : /*
569 : * InitLocalBuffers -
570 ECB : * init the local buffer cache. Since most queries (esp. multi-user ones)
571 : * don't involve local buffers, we delay allocating actual memory for the
572 : * buffers until we need them; just make the buffer headers here.
573 : */
574 : static void
575 CBC 237 : InitLocalBuffers(void)
576 : {
577 GIC 237 : int nbufs = num_temp_buffers;
578 ECB : HASHCTL info;
579 : int i;
580 :
581 : /*
582 : * Parallel workers can't access data in temporary tables, because they
583 : * have no visibility into the local buffers of their leader. This is a
584 : * convenient, low-cost place to provide a backstop check for that. Note
585 : * that we don't wish to prevent a parallel worker from accessing catalog
586 : * metadata about a temp table, so checks at higher levels would be
587 : * inappropriate.
588 : */
589 GIC 237 : if (IsParallelWorker())
590 UIC 0 : ereport(ERROR,
591 ECB : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
592 : errmsg("cannot access temporary tables during a parallel operation")));
593 :
594 : /* Allocate and zero buffer headers and auxiliary arrays */
595 CBC 237 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
596 GIC 237 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
597 237 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
598 237 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
599 UIC 0 : ereport(FATAL,
600 : (errcode(ERRCODE_OUT_OF_MEMORY),
601 : errmsg("out of memory")));
602 :
603 GNC 237 : nextFreeLocalBufId = 0;
604 :
605 : /* initialize fields that need to start off nonzero */
606 GIC 240153 : for (i = 0; i < nbufs; i++)
607 : {
608 239916 : BufferDesc *buf = GetLocalBufferDescriptor(i);
609 ECB :
610 : /*
611 : * negative to indicate local buffer. This is tricky: shared buffers
612 : * start with 0. We have to start with -2. (Note that the routine
613 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
614 : * is -1.)
615 : */
616 GIC 239916 : buf->buf_id = -i - 2;
617 ECB :
618 : /*
619 : * Intentionally do not initialize the buffer's atomic variable
620 : * (besides zeroing the underlying memory above). That way we get
621 : * errors on platforms without atomics, if somebody (re-)introduces
622 : * atomic operations for local buffers.
623 : */
624 : }
625 :
626 : /* Create the lookup hash table */
627 GIC 237 : info.keysize = sizeof(BufferTag);
628 CBC 237 : info.entrysize = sizeof(LocalBufferLookupEnt);
629 ECB :
630 GIC 237 : LocalBufHash = hash_create("Local Buffer Lookup Table",
631 : nbufs,
632 : &info,
633 : HASH_ELEM | HASH_BLOBS);
634 :
635 237 : if (!LocalBufHash)
636 UIC 0 : elog(ERROR, "could not initialize local buffer hash table");
637 :
638 : /* Initialization done, mark buffers allocated */
639 GIC 237 : NLocBuffer = nbufs;
640 237 : }
641 :
642 : /*
643 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
644 : * that does not support adjusting the usagecount - but so far it does not
645 : * seem worth the trouble.
646 : */
647 : bool
648 GNC 1258899 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
649 : {
650 : uint32 buf_state;
651 1258899 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
652 1258899 : int bufid = -buffer - 1;
653 :
654 1258899 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
655 :
656 1258899 : if (LocalRefCount[bufid] == 0)
657 : {
658 1174411 : NLocalPinnedBuffers++;
659 1174411 : if (adjust_usagecount &&
660 1159115 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
661 : {
662 51077 : buf_state += BUF_USAGECOUNT_ONE;
663 51077 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
664 : }
665 : }
666 1258899 : LocalRefCount[bufid]++;
667 1258899 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
668 : BufferDescriptorGetBuffer(buf_hdr));
669 :
670 1258899 : return buf_state & BM_VALID;
671 : }
672 :
673 : void
674 1605766 : UnpinLocalBuffer(Buffer buffer)
675 : {
676 1605766 : int buffid = -buffer - 1;
677 :
678 1605766 : Assert(BufferIsLocal(buffer));
679 1605766 : Assert(LocalRefCount[buffid] > 0);
680 1605766 : Assert(NLocalPinnedBuffers > 0);
681 :
682 1605766 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
683 1605766 : if (--LocalRefCount[buffid] == 0)
684 1174411 : NLocalPinnedBuffers--;
685 1605766 : }
686 :
687 : /*
688 : * GUC check_hook for temp_buffers
689 : */
690 : bool
691 1861 : check_temp_buffers(int *newval, void **extra, GucSource source)
692 : {
693 : /*
694 : * Once local buffers have been initialized, it's too late to change this.
695 : * However, if this is only a test call, allow it.
696 : */
697 1861 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
698 : {
699 UNC 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
700 0 : return false;
701 : }
702 GNC 1861 : return true;
703 : }
704 :
705 : /*
706 ECB : * GetLocalBufferStorage - allocate memory for a local buffer
707 : *
708 : * The idea of this function is to aggregate our requests for storage
709 : * so that the memory manager doesn't see a whole lot of relatively small
710 : * requests. Since we'll never give back a local buffer once it's created
711 : * within a particular process, no point in burdening memmgr with separately
712 : * managed chunks.
713 : */
714 : static Block
715 GIC 14051 : GetLocalBufferStorage(void)
716 : {
717 ECB : static char *cur_block = NULL;
718 : static int next_buf_in_block = 0;
719 : static int num_bufs_in_block = 0;
720 : static int total_bufs_allocated = 0;
721 : static MemoryContext LocalBufferContext = NULL;
722 :
723 : char *this_buf;
724 :
725 GBC 14051 : Assert(total_bufs_allocated < NLocBuffer);
726 :
727 GIC 14051 : if (next_buf_in_block >= num_bufs_in_block)
728 : {
729 : /* Need to make a new request to memmgr */
730 : int num_bufs;
731 :
732 : /*
733 : * We allocate local buffers in a context of their own, so that the
734 ECB : * space eaten for them is easily recognizable in MemoryContextStats
735 : * output. Create the context on first use.
736 EUB : */
737 GIC 375 : if (LocalBufferContext == NULL)
738 CBC 237 : LocalBufferContext =
739 237 : AllocSetContextCreate(TopMemoryContext,
740 ECB : "LocalBufferContext",
741 : ALLOCSET_DEFAULT_SIZES);
742 :
743 : /* Start with a 16-buffer request; subsequent ones double each time */
744 CBC 375 : num_bufs = Max(num_bufs_in_block * 2, 16);
745 : /* But not more than what we need for all remaining local bufs */
746 GIC 375 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
747 : /* And don't overflow MaxAllocSize, either */
748 375 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
749 :
750 : /* Buffers should be I/O aligned. */
751 GNC 375 : cur_block = (char *)
752 375 : TYPEALIGN(PG_IO_ALIGN_SIZE,
753 : MemoryContextAlloc(LocalBufferContext,
754 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
755 GIC 375 : next_buf_in_block = 0;
756 375 : num_bufs_in_block = num_bufs;
757 ECB : }
758 :
759 : /* Allocate next buffer in current memory block */
760 GIC 14051 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
761 CBC 14051 : next_buf_in_block++;
762 GIC 14051 : total_bufs_allocated++;
763 ECB :
764 GIC 14051 : return (Block) this_buf;
765 : }
766 :
767 ECB : /*
768 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
769 : *
770 : * This is just like CheckForBufferLeaks(), but for local buffers.
771 : */
772 : static void
773 GBC 499458 : CheckForLocalBufferLeaks(void)
774 : {
775 : #ifdef USE_ASSERT_CHECKING
776 GIC 499458 : if (LocalRefCount)
777 : {
778 46924 : int RefCountErrors = 0;
779 : int i;
780 :
781 CBC 47977904 : for (i = 0; i < NLocBuffer; i++)
782 ECB : {
783 GBC 47930980 : if (LocalRefCount[i] != 0)
784 : {
785 LBC 0 : Buffer b = -i - 1;
786 ECB :
787 LBC 0 : PrintBufferLeakWarning(b);
788 0 : RefCountErrors++;
789 : }
790 : }
791 CBC 46924 : Assert(RefCountErrors == 0);
792 : }
793 : #endif
794 GIC 499458 : }
795 :
796 : /*
797 : * AtEOXact_LocalBuffers - clean up at end of transaction.
798 : *
799 : * This is just like AtEOXact_Buffers, but for local buffers.
800 ECB : */
801 : void
802 CBC 486167 : AtEOXact_LocalBuffers(bool isCommit)
803 : {
804 GIC 486167 : CheckForLocalBufferLeaks();
805 486167 : }
806 :
807 : /*
808 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
809 : *
810 : * This is just like AtProcExit_Buffers, but for local buffers.
811 : */
812 : void
813 13291 : AtProcExit_LocalBuffers(void)
814 ECB : {
815 EUB : /*
816 : * We shouldn't be holding any remaining pins; if we are, and assertions
817 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
818 : * drop the temp rels.
819 : */
820 CBC 13291 : CheckForLocalBufferLeaks();
821 13291 : }
|