Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * hio.c
4 : * POSTGRES heap access method input/output code.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/heap/hio.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/heapam.h"
19 : #include "access/hio.h"
20 : #include "access/htup_details.h"
21 : #include "access/visibilitymap.h"
22 : #include "storage/bufmgr.h"
23 : #include "storage/freespace.h"
24 : #include "storage/lmgr.h"
25 : #include "storage/smgr.h"
26 :
27 :
28 : /*
29 : * RelationPutHeapTuple - place tuple at specified page
30 : *
31 : * !!! EREPORT(ERROR) IS DISALLOWED HERE !!! Must PANIC on failure!!!
32 : *
33 : * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
34 : */
35 : void
9770 scrappy 36 CBC 15256234 : RelationPutHeapTuple(Relation relation,
37 : Buffer buffer,
38 : HeapTuple tuple,
39 : bool token)
40 : {
41 : Page pageHeader;
42 : OffsetNumber offnum;
43 :
44 : /*
45 : * A tuple that's being inserted speculatively should already have its
46 : * token set.
47 : */
2893 andres 48 15256234 : Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
49 :
50 : /*
51 : * Do not allow tuples with invalid combinations of hint bits to be placed
52 : * on a page. This combination is detected as corruption by the
53 : * contrib/amcheck logic, so if you disable this assertion, make
54 : * corresponding changes there.
55 : */
899 rhaas 56 15256234 : Assert(!((tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED) &&
57 : (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)));
58 :
59 : /* Add the tuple to the page */
2545 kgrittn 60 15256234 : pageHeader = BufferGetPage(buffer);
61 :
7940 tgl 62 15256234 : offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
63 : tuple->t_len, InvalidOffsetNumber, false, true);
64 :
8315 vadim4o 65 15256234 : if (offnum == InvalidOffsetNumber)
7202 tgl 66 UBC 0 : elog(PANIC, "failed to add tuple to page");
67 :
68 : /* Update tuple->t_self to the actual position where it was stored */
7940 tgl 69 CBC 15256234 : ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
70 :
71 : /*
72 : * Insert the correct position into CTID of the stored tuple, too (unless
73 : * this is a speculative insertion, in which case the token is held in
74 : * CTID field instead)
75 : */
2893 andres 76 15256234 : if (!token)
77 : {
78 15254221 : ItemId itemId = PageGetItemId(pageHeader, offnum);
1866 tgl 79 15254221 : HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
80 :
81 15254221 : item->t_ctid = tuple->t_self;
82 : }
9770 scrappy 83 15256234 : }
84 :
85 : /*
86 : * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
87 : */
88 : static Buffer
5267 tgl 89 12680066 : ReadBufferBI(Relation relation, BlockNumber targetBlock,
90 : ReadBufferMode mode, BulkInsertState bistate)
91 : {
92 : Buffer buffer;
93 :
94 : /* If not bulk-insert, exactly like ReadBuffer */
95 12680066 : if (!bistate)
1526 andres 96 11559672 : return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
97 : mode, NULL);
98 :
99 : /* If we have the desired block already pinned, re-pin and return it */
5267 tgl 100 1120394 : if (bistate->current_buf != InvalidBuffer)
101 : {
102 1120232 : if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
103 : {
104 : /*
105 : * Currently the LOCK variants are only used for extending
106 : * relation, which should never reach this branch.
107 : */
1526 andres 108 1112439 : Assert(mode != RBM_ZERO_AND_LOCK &&
109 : mode != RBM_ZERO_AND_CLEANUP_LOCK);
110 :
5267 tgl 111 1112439 : IncrBufferRefCount(bistate->current_buf);
112 1112439 : return bistate->current_buf;
113 : }
114 : /* ... else drop the old buffer */
115 7793 : ReleaseBuffer(bistate->current_buf);
116 7793 : bistate->current_buf = InvalidBuffer;
117 : }
118 :
119 : /* Perform a read using the buffer strategy */
120 7955 : buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
121 : mode, bistate->strategy);
122 :
123 : /* Save the selected block as target for future inserts */
124 7955 : IncrBufferRefCount(buffer);
125 7955 : bistate->current_buf = buffer;
126 :
127 7955 : return buffer;
128 : }
129 :
130 : /*
131 : * For each heap page which is all-visible, acquire a pin on the appropriate
132 : * visibility map page, if we haven't already got one.
133 : *
134 : * To avoid complexity in the callers, either buffer1 or buffer2 may be
135 : * InvalidBuffer if only one buffer is involved. For the same reason, block2
136 : * may be smaller than block1.
137 : *
138 : * Returns whether buffer locks were temporarily released.
139 : */
140 : static bool
4304 rhaas 141 GIC 12887067 : GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
142 : BlockNumber block1, BlockNumber block2,
4304 rhaas 143 ECB : Buffer *vmbuffer1, Buffer *vmbuffer2)
144 : {
145 : bool need_to_pin_buffer1;
146 : bool need_to_pin_buffer2;
3 andres 147 GNC 12887067 : bool released_locks = false;
148 :
149 : /*
150 : * Swap buffers around to handle case of a single block/buffer, and to
151 : * handle if lock ordering rules require to lock block2 first.
152 : */
153 25773775 : if (!BufferIsValid(buffer1) ||
154 13093350 : (BufferIsValid(buffer2) && block1 > block2))
155 : {
156 195115 : Buffer tmpbuf = buffer1;
157 195115 : Buffer *tmpvmbuf = vmbuffer1;
158 195115 : BlockNumber tmpblock = block1;
159 :
160 195115 : buffer1 = buffer2;
161 195115 : vmbuffer1 = vmbuffer2;
162 195115 : block1 = block2;
163 :
164 195115 : buffer2 = tmpbuf;
165 195115 : vmbuffer2 = tmpvmbuf;
166 195115 : block2 = tmpblock;
167 : }
168 :
4304 rhaas 169 GIC 12887067 : Assert(BufferIsValid(buffer1));
1527 akapila 170 CBC 12887067 : Assert(buffer2 == InvalidBuffer || block1 <= block2);
171 :
172 : while (1)
173 : {
174 : /* Figure out which pins we need but don't have. */
2545 kgrittn 175 GIC 12887067 : need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
4304 rhaas 176 CBC 12887067 : && !visibilitymap_pin_ok(block1, *vmbuffer1);
177 12887067 : need_to_pin_buffer2 = buffer2 != InvalidBuffer
2545 kgrittn 178 GIC 206642 : && PageIsAllVisible(BufferGetPage(buffer2))
4304 rhaas 179 CBC 13093709 : && !visibilitymap_pin_ok(block2, *vmbuffer2);
180 12887067 : if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
3 andres 181 GNC 12887067 : break;
182 :
4304 rhaas 183 ECB : /* We must unlock both buffers before doing any I/O. */
3 andres 184 UNC 0 : released_locks = true;
4304 rhaas 185 LBC 0 : LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
186 0 : if (buffer2 != InvalidBuffer && buffer2 != buffer1)
4304 rhaas 187 UIC 0 : LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
4304 rhaas 188 ECB :
189 : /* Get pins. */
4304 rhaas 190 LBC 0 : if (need_to_pin_buffer1)
4304 rhaas 191 UIC 0 : visibilitymap_pin(relation, block1, vmbuffer1);
192 0 : if (need_to_pin_buffer2)
4304 rhaas 193 LBC 0 : visibilitymap_pin(relation, block2, vmbuffer2);
4304 rhaas 194 ECB :
195 : /* Relock buffers. */
4304 rhaas 196 UIC 0 : LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
197 0 : if (buffer2 != InvalidBuffer && buffer2 != buffer1)
198 0 : LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
4304 rhaas 199 ECB :
200 : /*
201 : * If there are two buffers involved and we pinned just one of them,
202 : * it's possible that the second one became all-visible while we were
3260 bruce 203 : * busy pinning the first one. If it looks like that's a possible
4304 rhaas 204 : * scenario, we'll need to make a second pass through this loop.
205 : */
4304 rhaas 206 UIC 0 : if (buffer2 == InvalidBuffer || buffer1 == buffer2
207 0 : || (need_to_pin_buffer1 && need_to_pin_buffer2))
4304 rhaas 208 EUB : break;
209 : }
210 :
3 andres 211 GNC 12887067 : return released_locks;
4304 rhaas 212 EUB : }
213 :
214 : /*
215 : * Extend the relation. By multiple pages, if beneficial.
216 : *
217 : * If the caller needs multiple pages (num_pages > 1), we always try to extend
218 : * by at least that much.
219 : *
220 : * If there is contention on the extension lock, we don't just extend "for
221 : * ourselves", but we try to help others. We can do so by adding empty pages
222 : * into the FSM. Typically there is no contention when we can't use the FSM.
223 : *
224 : * We do have to limit the number of pages to extend by to some value, as the
225 : * buffers for all the extended pages need to, temporarily, be pinned. For now
226 : * we define MAX_BUFFERS_TO_EXTEND_BY to be 64 buffers, it's hard to see
227 : * benefits with higher numbers. This partially is because copyfrom.c's
228 : * MAX_BUFFERED_TUPLES / MAX_BUFFERED_BYTES prevents larger multi_inserts.
229 : *
230 : * Returns a buffer for a newly extended block. If possible, the buffer is
231 : * returned exclusively locked. *did_unlock is set to true if the lock had to
232 : * be released, false otherwise.
233 : *
234 : *
235 : * XXX: It would likely be beneficial for some workloads to extend more
236 : * aggressively, e.g. using a heuristic based on the relation size.
2557 237 : */
238 : static Buffer
3 andres 239 GNC 215347 : RelationAddBlocks(Relation relation, BulkInsertState bistate,
240 : int num_pages, bool use_fsm, bool *did_unlock)
2557 rhaas 241 EUB : {
242 : #define MAX_BUFFERS_TO_EXTEND_BY 64
243 : Buffer victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
3 andres 244 GNC 215347 : BlockNumber first_block = InvalidBlockNumber;
245 215347 : BlockNumber last_block = InvalidBlockNumber;
246 : uint32 extend_by_pages;
247 : uint32 not_in_fsm_pages;
248 : Buffer buffer;
249 : Page page;
2557 rhaas 250 EUB :
251 : /*
252 : * Determine by how many pages to try to extend by.
253 : */
3 andres 254 GNC 215347 : if (bistate == NULL && !use_fsm)
255 : {
256 : /*
257 : * If we have neither bistate, nor can use the FSM, we can't bulk
258 : * extend - there'd be no way to find the additional pages.
259 : */
260 167 : extend_by_pages = 1;
261 : }
262 : else
263 : {
264 : uint32 waitcount;
265 :
266 : /*
267 : * Try to extend at least by the number of pages the caller needs. We
268 : * can remember the additional pages (either via FSM or bistate).
269 : */
270 215180 : extend_by_pages = num_pages;
271 :
272 215180 : if (!RELATION_IS_LOCAL(relation))
273 114049 : waitcount = RelationExtensionLockWaiterCount(relation);
274 : else
275 101131 : waitcount = 0;
276 :
277 : /*
278 : * Multiply the number of pages to extend by the number of waiters. Do
279 : * this even if we're not using the FSM, as it still relieves
280 : * contention, by deferring the next time this backend needs to
281 : * extend. In that case the extended pages will be found via
282 : * bistate->next_free.
283 : */
284 215180 : extend_by_pages += extend_by_pages * waitcount;
285 :
286 : /*
287 : * Can't extend by more than MAX_BUFFERS_TO_EXTEND_BY, we need to pin
288 : * them all concurrently.
289 : */
290 215180 : extend_by_pages = Min(extend_by_pages, MAX_BUFFERS_TO_EXTEND_BY);
3 andres 291 ECB : }
292 :
293 : /*
294 : * How many of the extended pages should be entered into the FSM?
295 : *
296 : * If we have a bistate, only enter pages that we don't need ourselves
297 : * into the FSM. Otherwise every other backend will immediately try to
298 : * use the pages this backend needs for itself, causing unnecessary
299 : * contention. If we don't have a bistate, we can't avoid the FSM.
300 : *
301 : * Never enter the page returned into the FSM, we'll immediately use it.
302 : */
3 andres 303 GNC 215347 : if (num_pages > 1 && bistate == NULL)
304 248 : not_in_fsm_pages = 1;
305 : else
306 215099 : not_in_fsm_pages = num_pages;
307 :
308 : /* prepare to put another buffer into the bistate */
309 215347 : if (bistate && bistate->current_buf != InvalidBuffer)
310 : {
311 10440 : ReleaseBuffer(bistate->current_buf);
312 10440 : bistate->current_buf = InvalidBuffer;
313 : }
314 :
315 : /*
316 : * Extend the relation. We ask for the first returned page to be locked,
317 : * so that we are sure that nobody has inserted into the page
318 : * concurrently.
319 : *
320 : * With the current MAX_BUFFERS_TO_EXTEND_BY there's no danger of
321 : * [auto]vacuum trying to truncate later pages as REL_TRUNCATE_MINIMUM is
322 : * way larger.
323 : */
324 215347 : first_block = ExtendBufferedRelBy(EB_REL(relation), MAIN_FORKNUM,
325 : bistate ? bistate->strategy : NULL,
326 : EB_LOCK_FIRST,
327 : extend_by_pages,
328 : victim_buffers,
329 : &extend_by_pages);
330 215347 : buffer = victim_buffers[0]; /* the buffer the function will return */
331 215347 : last_block = first_block + (extend_by_pages - 1);
332 215347 : Assert(first_block == BufferGetBlockNumber(buffer));
333 :
334 : /*
335 : * Relation is now extended. Initialize the page. We do this here, before
336 : * potentially releasing the lock on the page, because it allows us to
337 : * double check that the page contents are empty (this should never
338 : * happen, but if it does we don't want to risk wiping out valid data).
339 : */
340 215347 : page = BufferGetPage(buffer);
341 215347 : if (!PageIsNew(page))
3 andres 342 UNC 0 : elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
343 : first_block,
344 : RelationGetRelationName(relation));
345 :
3 andres 346 GNC 215347 : PageInit(page, BufferGetPageSize(buffer), 0);
347 215347 : MarkBufferDirty(buffer);
348 :
349 : /*
350 : * If we decided to put pages into the FSM, release the buffer lock (but
351 : * not pin), we don't want to do IO while holding a buffer lock. This will
352 : * necessitate a bit more extensive checking in our caller.
353 : */
354 215347 : if (use_fsm && not_in_fsm_pages < extend_by_pages)
355 : {
356 249 : LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
357 249 : *did_unlock = true;
358 : }
359 : else
360 215098 : *did_unlock = false;
361 :
362 : /*
363 : * Relation is now extended. Release pins on all buffers, except for the
364 : * first (which we'll return). If we decided to put pages into the FSM,
365 : * we can do that as part of the same loop.
366 : */
367 224616 : for (uint32 i = 1; i < extend_by_pages; i++)
368 : {
369 9269 : BlockNumber curBlock = first_block + i;
370 :
371 9269 : Assert(curBlock == BufferGetBlockNumber(victim_buffers[i]));
372 9269 : Assert(BlockNumberIsValid(curBlock));
373 :
374 9269 : ReleaseBuffer(victim_buffers[i]);
375 :
376 9269 : if (use_fsm && i >= not_in_fsm_pages)
377 : {
378 1504 : Size freespace = BufferGetPageSize(victim_buffers[i]) -
379 : SizeOfPageHeaderData;
380 :
381 1504 : RecordPageWithFreeSpace(relation, curBlock, freespace);
382 : }
383 : }
384 :
385 215347 : if (use_fsm && not_in_fsm_pages < extend_by_pages)
386 : {
387 249 : BlockNumber first_fsm_block = first_block + not_in_fsm_pages;
388 :
389 249 : FreeSpaceMapVacuumRange(relation, first_fsm_block, last_block);
390 : }
391 :
392 215347 : if (bistate)
393 : {
394 : /*
395 : * Remember the additional pages we extended by, so we later can use
396 : * them without looking into the FSM.
397 : */
398 12073 : if (extend_by_pages > 1)
399 : {
400 977 : bistate->next_free = first_block + 1;
401 977 : bistate->last_free = last_block;
402 : }
403 : else
404 : {
405 11096 : bistate->next_free = InvalidBlockNumber;
406 11096 : bistate->last_free = InvalidBlockNumber;
407 : }
408 :
409 : /* maintain bistate->current_buf */
410 12073 : IncrBufferRefCount(buffer);
411 12073 : bistate->current_buf = buffer;
412 : }
413 :
414 215347 : return buffer;
415 : #undef MAX_BUFFERS_TO_EXTEND_BY
2557 rhaas 416 ECB : }
417 :
9770 scrappy 418 : /*
8315 vadim4o 419 : * RelationGetBufferForTuple
420 : *
7954 tgl 421 : * Returns pinned and exclusive-locked buffer of a page in given relation
422 : * with free space >= given len.
423 : *
424 : * If num_pages is > 1, we will try to extend the relation by at least that
425 : * many pages when we decide to extend the relation. This is more efficient
426 : * for callers that know they will need multiple pages
427 : * (e.g. heap_multi_insert()).
428 : *
429 : * If otherBuffer is not InvalidBuffer, then it references a previously
430 : * pinned buffer of another page in the same relation; on return, this
431 : * buffer will also be exclusive-locked. (This case is used by heap_update;
432 : * the otherBuffer contains the tuple being updated.)
433 : *
434 : * The reason for passing otherBuffer is that if two backends are doing
435 : * concurrent heap_update operations, a deadlock could occur if they try
436 : * to lock the same two buffers in opposite orders. To ensure that this
437 : * can't happen, we impose the rule that buffers of a relation must be
438 : * locked in increasing page number order. This is most conveniently done
439 : * by having RelationGetBufferForTuple lock them both, with suitable care
440 : * for ordering.
9345 bruce 441 : *
442 : * NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
443 : * same buffer we select for insertion of the new tuple (this could only
444 : * happen if space is freed in that page after heap_update finds there's not
445 : * enough there). In that case, the page will be pinned and locked only once.
446 : *
447 : * We also handle the possibility that the all-visible flag will need to be
448 : * cleared on one or both pages. If so, pin on the associated visibility map
449 : * page must be acquired before acquiring buffer lock(s), to avoid possibly
450 : * doing I/O while holding buffer locks. The pins are passed back to the
451 : * caller using the input-output arguments vmbuffer and vmbuffer_other.
452 : * Note that in some cases the caller might have already acquired such pins,
453 : * which is indicated by these arguments not being InvalidBuffer on entry.
4212 rhaas 454 : *
5050 bruce 455 : * We normally use FSM to help us find free space. However,
456 : * if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
5267 tgl 457 : * the end of the relation if the tuple won't fit on the current target page.
458 : * This can save some cycles when we know the relation is new and doesn't
459 : * contain useful amounts of free space.
6502 460 : *
461 : * HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
462 : * relation, if the caller holds exclusive lock and is careful to invalidate
4807 463 : * relation's smgr_targblock before the first insertion --- that ensures that
464 : * all insertions will occur into newly added pages and not be intermixed
465 : * with tuples from other transactions. That way, a crash can't risk losing
466 : * any committed data of other transactions. (See heap_insert's comments
467 : * for additional constraints needed for safe usage of this behavior.)
468 : *
469 : * The caller can also provide a BulkInsertState object to optimize many
470 : * insertions into the same relation. This keeps a pin on the current
471 : * insertion target page (to save pin/unpin cycles) and also passes a
472 : * BULKWRITE buffer selection strategy object to the buffer manager.
473 : * Passing NULL for bistate selects the default behavior.
474 : *
740 noah 475 : * We don't fill existing pages further than the fillfactor, except for large
476 : * tuples in nearly-empty pages. This is OK since this routine is not
477 : * consulted when updating a tuple and keeping it on the same page, which is
478 : * the scenario fillfactor is meant to reserve space for.
479 : *
480 : * ereport(ERROR) is allowed here, so this routine *must* be called
7998 tgl 481 : * before any (unlogged) changes are made in buffer pool.
9770 scrappy 482 : */
8315 vadim4o 483 : Buffer
7998 tgl 484 GIC 12869513 : RelationGetBufferForTuple(Relation relation, Size len,
485 : Buffer otherBuffer, int options,
486 : BulkInsertState bistate,
487 : Buffer *vmbuffer, Buffer *vmbuffer_other,
488 : int num_pages)
489 : {
5267 490 12869513 : bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
8002 491 12869513 : Buffer buffer = InvalidBuffer;
5383 tgl 492 ECB : Page page;
740 noah 493 : Size nearlyEmptyFreeSpace,
740 noah 494 GBC 12869513 : pageFreeSpace = 0,
740 noah 495 GIC 12869513 : saveFreeSpace = 0,
496 12869513 : targetFreeSpace = 0;
497 : BlockNumber targetBlock,
7954 tgl 498 ECB : otherBlock;
499 : bool unlockedTargetBuffer;
500 : bool recheckVmPins;
501 :
8315 vadim4o 502 GIC 12869513 : len = MAXALIGN(len); /* be conservative */
503 :
504 : /* if the caller doesn't know by how many pages to extend, extend by 1 */
3 andres 505 GNC 12869513 : if (num_pages <= 0)
506 12032039 : num_pages = 1;
507 :
508 : /* Bulk insert is not supported for updates, only inserts. */
5267 tgl 509 GIC 12869513 : Assert(otherBuffer == InvalidBuffer || !bistate);
510 :
8532 tgl 511 ECB : /*
512 : * If we're gonna fail for oversize tuple, do it right away
513 : */
5907 tgl 514 CBC 12869513 : if (len > MaxHeapTupleSize)
7202 tgl 515 UIC 0 : ereport(ERROR,
516 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
3363 tgl 517 ECB : errmsg("row is too big: size %zu, maximum size %zu",
518 : len, MaxHeapTupleSize)));
519 :
520 : /* Compute desired extra freespace due to fillfactor option */
6124 tgl 521 GIC 12869513 : saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
522 : HEAP_DEFAULT_FILLFACTOR);
523 :
740 noah 524 ECB : /*
525 : * Since pages without tuples can still have line pointers, we consider
526 : * pages "empty" when the unavailable space is slight. This threshold is
527 : * somewhat arbitrary, but it should prevent most unnecessary relation
528 : * extensions while inserting large tuples into low-fillfactor tables.
529 : */
740 noah 530 GIC 12869513 : nearlyEmptyFreeSpace = MaxHeapTupleSize -
740 noah 531 ECB : (MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));
740 noah 532 GIC 12869513 : if (len + saveFreeSpace > nearlyEmptyFreeSpace)
740 noah 533 CBC 36 : targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
534 : else
535 12869477 : targetFreeSpace = len + saveFreeSpace;
536 :
7954 tgl 537 GIC 12869513 : if (otherBuffer != InvalidBuffer)
7954 tgl 538 CBC 196178 : otherBlock = BufferGetBlockNumber(otherBuffer);
539 : else
2118 tgl 540 GIC 12673335 : otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */
541 :
9345 bruce 542 ECB : /*
543 : * We first try to put the tuple on the same page we last inserted a tuple
3260 544 : * on, as cached in the BulkInsertState or relcache entry. If that
545 : * doesn't work, we ask the Free Space Map to locate a suitable page.
5267 tgl 546 : * Since the FSM's info might be out of date, we have to be prepared to
547 : * loop around and retry multiple times. (To insure this isn't an infinite
548 : * loop, we must update the FSM with the correct amount of free space on
549 : * each page that proves not to be suitable.) If the FSM has no record of
550 : * a page with enough free space, we give up and extend the relation.
551 : *
552 : * When use_fsm is false, we either put the tuple onto the existing target
553 : * page or extend the relation.
554 : */
740 noah 555 CBC 12869513 : if (bistate && bistate->current_buf != InvalidBuffer)
5267 tgl 556 GIC 1112439 : targetBlock = BufferGetBlockNumber(bistate->current_buf);
5267 tgl 557 ECB : else
4807 tgl 558 CBC 11757074 : targetBlock = RelationGetTargetBlock(relation);
559 :
6502 tgl 560 GIC 12869513 : if (targetBlock == InvalidBlockNumber && use_fsm)
561 : {
7954 tgl 562 ECB : /*
563 : * We have no cached target page, so ask the FSM for an initial
564 : * target.
565 : */
740 noah 566 GIC 120652 : targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
670 tomas.vondra 567 ECB : }
1433 akapila 568 :
569 : /*
570 : * If the FSM knows nothing of the rel, try the last page before we give
650 andrew 571 : * up and extend. This avoids one-tuple-per-page syndrome during
572 : * bootstrapping or in a recently-started system.
573 : */
670 tomas.vondra 574 GIC 12869513 : if (targetBlock == InvalidBlockNumber)
575 : {
576 111694 : BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
577 :
578 111694 : if (nblocks > 0)
579 87859 : targetBlock = nblocks - 1;
580 : }
581 :
2557 rhaas 582 12869513 : loop:
7954 tgl 583 13086260 : while (targetBlock != InvalidBlockNumber)
584 : {
585 : /*
586 : * Read and exclusive-lock the target block, as well as the other
587 : * block if one was given, taking suitable care with lock ordering and
588 : * the possibility they are the same block.
589 : *
590 : * If the page-level all-visible flag is set, caller will need to
591 : * clear both that and the corresponding visibility map bit. However,
592 : * by the time we return, we'll have x-locked the buffer, and we don't
593 : * want to do any I/O while in that state. So we check the bit here
594 : * before taking the lock, and pin the page if it appears necessary.
595 : * Checking without the lock creates a risk of getting the wrong
596 : * answer, so we'll have to recheck after acquiring the lock.
597 : */
598 12879865 : if (otherBuffer == InvalidBuffer)
599 : {
600 : /* easy case */
1526 andres 601 12680066 : buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
2545 kgrittn 602 12680066 : if (PageIsAllVisible(BufferGetPage(buffer)))
4310 rhaas 603 12083 : visibilitymap_pin(relation, targetBlock, vmbuffer);
604 :
605 : /*
606 : * If the page is empty, pin vmbuffer to set all_frozen bit later.
607 : */
812 tomas.vondra 608 12683540 : if ((options & HEAP_INSERT_FROZEN) &&
609 3474 : (PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
610 1542 : visibilitymap_pin(relation, targetBlock, vmbuffer);
611 :
7954 tgl 612 12680066 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
613 : }
614 199799 : else if (otherBlock == targetBlock)
615 : {
616 : /* also easy case */
617 1461 : buffer = otherBuffer;
2545 kgrittn 618 1461 : if (PageIsAllVisible(BufferGetPage(buffer)))
4310 rhaas 619 UIC 0 : visibilitymap_pin(relation, targetBlock, vmbuffer);
7954 tgl 620 GIC 1461 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
621 : }
622 198338 : else if (otherBlock < targetBlock)
623 : {
624 : /* lock other buffer first */
625 194756 : buffer = ReadBuffer(relation, targetBlock);
2545 kgrittn 626 194756 : if (PageIsAllVisible(BufferGetPage(buffer)))
4310 rhaas 627 323 : visibilitymap_pin(relation, targetBlock, vmbuffer);
7954 tgl 628 194756 : LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
7998 629 194756 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
630 : }
631 : else
632 : {
633 : /* lock target buffer first */
7954 634 3582 : buffer = ReadBuffer(relation, targetBlock);
2545 kgrittn 635 3582 : if (PageIsAllVisible(BufferGetPage(buffer)))
4310 rhaas 636 40 : visibilitymap_pin(relation, targetBlock, vmbuffer);
7954 tgl 637 3582 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
638 3582 : LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
639 : }
640 :
4310 rhaas 641 ECB : /*
642 : * We now have the target page (and the other buffer, if any) pinned
643 : * and locked. However, since our initial PageIsAllVisible checks
644 : * were performed before acquiring the lock, the results might now be
645 : * out of date, either for the selected victim buffer, or for the
646 : * other buffer passed by the caller. In that case, we'll need to
3955 bruce 647 : * give up our locks, go get the pin(s) we failed to get earlier, and
4304 rhaas 648 : * re-lock. That's pretty painful, but hopefully shouldn't happen
649 : * often.
650 : *
3955 bruce 651 : * Note that there's a small possibility that we didn't pin the page
652 : * above but still have the correct page pinned anyway, either because
653 : * we've already made a previous pass through this loop, or because
654 : * caller passed us the right page anyway.
655 : *
656 : * Note also that it's possible that by the time we get the pin and
657 : * retake the buffer locks, the visibility map bit will have been
658 : * cleared by some other backend anyway. In that case, we'll have
659 : * done a bit of extra work for no gain, but there's no real harm
660 : * done.
661 : */
3 andres 662 GNC 12879865 : GetVisibilityMapPins(relation, buffer, otherBuffer,
663 : targetBlock, otherBlock, vmbuffer,
664 : vmbuffer_other);
665 :
7954 tgl 666 ECB : /*
6385 bruce 667 EUB : * Now we can check to see if there's enough free space here. If so,
668 : * we're done.
669 : */
2545 kgrittn 670 GIC 12879865 : page = BufferGetPage(buffer);
671 :
672 : /*
1526 andres 673 ECB : * If necessary initialize page, it'll be used soon. We could avoid
674 : * dirtying the buffer here, and rely on the caller to do so whenever
675 : * it puts a tuple onto the page, but there seems not much benefit in
676 : * doing so.
677 : */
1526 andres 678 GIC 12879865 : if (PageIsNew(page))
679 : {
680 9265 : PageInit(page, BufferGetPageSize(buffer), 0);
681 9265 : MarkBufferDirty(buffer);
1526 andres 682 ECB : }
683 :
5383 tgl 684 CBC 12879865 : pageFreeSpace = PageGetHeapFreeSpace(page);
740 noah 685 12879865 : if (targetFreeSpace <= pageFreeSpace)
686 : {
7954 tgl 687 ECB : /* use this page as future insert target, too */
4807 tgl 688 GIC 12654166 : RelationSetTargetBlock(relation, targetBlock);
7954 tgl 689 CBC 12654166 : return buffer;
7954 tgl 690 ECB : }
691 :
692 : /*
693 : * Not enough space, so we must give up our page locks and pin (if
694 : * any) and prepare to look elsewhere. We don't care which order we
695 : * unlock the two buffers in, so this can be slightly simpler than the
696 : * code above.
697 : */
7954 tgl 698 GIC 225699 : LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
699 225699 : if (otherBuffer == InvalidBuffer)
700 215235 : ReleaseBuffer(buffer);
701 10464 : else if (otherBlock != targetBlock)
702 : {
703 9003 : LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
704 9003 : ReleaseBuffer(buffer);
705 : }
706 :
707 : /* Is there an ongoing bulk extension? */
3 andres 708 GNC 225699 : if (bistate && bistate->next_free != InvalidBlockNumber)
709 : {
710 7765 : Assert(bistate->next_free <= bistate->last_free);
2557 rhaas 711 ECB :
712 : /*
713 : * We bulk extended the relation before, and there are still some
714 : * unused pages from that extension, so we don't need to look in
715 : * the FSM for a new page. But do record the free space from the
716 : * last page, somebody might insert narrower tuples later.
717 : */
3 andres 718 GNC 7765 : if (use_fsm)
719 6216 : RecordPageWithFreeSpace(relation, targetBlock, pageFreeSpace);
720 :
721 7765 : targetBlock = bistate->next_free;
722 7765 : if (bistate->next_free >= bistate->last_free)
723 : {
724 977 : bistate->next_free = InvalidBlockNumber;
725 977 : bistate->last_free = InvalidBlockNumber;
726 : }
727 : else
728 6788 : bistate->next_free++;
729 : }
730 217934 : else if (!use_fsm)
731 : {
732 : /* Without FSM, always fall out of the loop and extend */
733 8952 : break;
734 : }
735 : else
736 : {
737 : /*
738 : * Update FSM as to condition of this page, and ask for another
739 : * page to try.
740 : */
741 208982 : targetBlock = RecordAndGetPageWithFreeSpace(relation,
742 : targetBlock,
743 : pageFreeSpace,
744 : targetFreeSpace);
745 : }
746 : }
747 :
748 : /* Have to extend the relation */
749 215347 : buffer = RelationAddBlocks(relation, bistate, num_pages, use_fsm,
750 : &unlockedTargetBuffer);
751 :
752 215347 : targetBlock = BufferGetBlockNumber(buffer);
1526 753 215347 : page = BufferGetPage(buffer);
8002 tgl 754 ECB :
812 tomas.vondra 755 : /*
756 : * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
757 : * do IO while the buffer is locked, so we unlock the page first if IO is
758 : * needed (necessitating checks below).
759 : */
812 tomas.vondra 760 GNC 215347 : if (options & HEAP_INSERT_FROZEN)
761 : {
3 andres 762 211 : Assert(PageGetMaxOffsetNumber(page) == 0);
763 :
764 211 : if (!visibilitymap_pin_ok(targetBlock, *vmbuffer))
765 : {
766 112 : if (!unlockedTargetBuffer)
767 112 : LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
768 112 : unlockedTargetBuffer = true;
769 112 : visibilitymap_pin(relation, targetBlock, vmbuffer);
3 andres 770 ECB : }
771 : }
812 tomas.vondra 772 :
773 : /*
774 : * Reacquire locks if necessary.
1526 andres 775 : *
776 : * If the target buffer was unlocked above, or is unlocked while
777 : * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
778 : * that another backend used space on this page. We check for that below,
779 : * and retry if necessary.
780 : */
3 andres 781 GNC 215347 : recheckVmPins = false;
782 215347 : if (unlockedTargetBuffer)
783 : {
784 : /* released lock on target buffer above */
785 361 : if (otherBuffer != InvalidBuffer)
786 2 : LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
787 361 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
788 361 : recheckVmPins = true;
789 : }
790 214986 : else if (otherBuffer != InvalidBuffer)
791 : {
792 : /*
793 : * We did not release the target buffer, and otherBuffer is valid,
794 : * need to lock the other buffer. It's guaranteed to be of a lower
795 : * page number than the new page. To conform with the deadlock
796 : * prevent rules, we ought to lock otherBuffer first, but that would
797 : * give other backends a chance to put tuples on our page. To reduce
798 : * the likelihood of that, attempt to lock the other buffer
799 : * conditionally, that's very likely to work.
800 : *
801 : * Alternatively, we could acquire the lock on otherBuffer before
802 : * extending the relation, but that'd require holding the lock while
803 : * performing IO, which seems worse than an unlikely retry.
804 : */
1526 andres 805 GIC 6841 : Assert(otherBuffer != buffer);
726 tgl 806 6841 : Assert(targetBlock > otherBlock);
807 :
1526 andres 808 6841 : if (unlikely(!ConditionalLockBuffer(otherBuffer)))
809 : {
3 andres 810 UNC 0 : unlockedTargetBuffer = true;
1526 andres 811 LBC 0 : LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1526 andres 812 UIC 0 : LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
1526 andres 813 LBC 0 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
191 tgl 814 ECB : }
3 andres 815 GNC 6841 : recheckVmPins = true;
816 : }
817 :
818 : /*
819 : * If one of the buffers was unlocked (always the case if otherBuffer is
820 : * valid), it's possible, although unlikely, that an all-visible flag
821 : * became set. We can use GetVisibilityMapPins to deal with that. It's
822 : * possible that GetVisibilityMapPins() might need to temporarily release
823 : * buffer locks, in which case we'll need to check if there's still enough
824 : * space on the page below.
825 : */
826 215347 : if (recheckVmPins)
827 : {
828 7202 : if (GetVisibilityMapPins(relation, otherBuffer, buffer,
829 : otherBlock, targetBlock, vmbuffer_other,
830 : vmbuffer))
3 andres 831 UNC 0 : unlockedTargetBuffer = true;
832 : }
833 :
834 : /*
835 : * If the target buffer was temporarily unlocked since the relation
836 : * extension, it's possible, although unlikely, that all the space on the
837 : * page was already used. If so, we just retry from the start. If we
838 : * didn't unlock, something has gone wrong if there's not enough space -
839 : * the test at the top should have prevented reaching this case.
840 : */
3 andres 841 GNC 215347 : pageFreeSpace = PageGetHeapFreeSpace(page);
842 215347 : if (len > pageFreeSpace)
843 : {
3 andres 844 UNC 0 : if (unlockedTargetBuffer)
191 tgl 845 ECB : {
3 andres 846 UNC 0 : if (otherBuffer != InvalidBuffer)
847 0 : LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
191 tgl 848 LBC 0 : UnlockReleaseBuffer(buffer);
1526 andres 849 ECB :
191 tgl 850 UIC 0 : goto loop;
851 : }
3363 852 0 : elog(PANIC, "tuple is too big: size %zu", len);
853 : }
854 :
855 : /*
856 : * Remember the new page as our target for future insertions.
857 : *
858 : * XXX should we enter the new page into the free space map immediately,
6347 bruce 859 ECB : * or just keep it for this backend's exclusive use in the short run
860 : * (until VACUUM sees it)? Seems to depend on whether you expect the
861 : * current backend to make more insertions or not, which is probably a
862 : * good bet most of the time. So for now, don't add it to FSM yet.
7954 tgl 863 : */
3 andres 864 GNC 215347 : RelationSetTargetBlock(relation, targetBlock);
7954 tgl 865 ECB :
8002 tgl 866 CBC 215347 : return buffer;
867 : }
|