TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgxlog.c
4 : * WAL replay logic for SP-GiST
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgxlog.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/bufmask.h"
18 : #include "access/spgist_private.h"
19 : #include "access/spgxlog.h"
20 : #include "access/transam.h"
21 : #include "access/xlog.h"
22 : #include "access/xlogutils.h"
23 : #include "storage/standby.h"
24 : #include "utils/memutils.h"
25 :
26 :
27 : static MemoryContext opCtx; /* working memory for operations */
28 :
29 :
30 : /*
31 : * Prepare a dummy SpGistState, with just the minimum info needed for replay.
32 : *
33 : * At present, all we need is enough info to support spgFormDeadTuple(),
34 : * plus the isBuild flag.
35 : */
36 : static void
37 CBC 408 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
38 : {
39 408 : memset(state, 0, sizeof(*state));
40 :
41 408 : state->myXid = stateSrc.myXid;
42 408 : state->isBuild = stateSrc.isBuild;
43 408 : state->deadTupleStorage = palloc0(SGDTSIZE);
44 408 : }
45 :
46 : /*
47 : * Add a leaf tuple, or replace an existing placeholder tuple. This is used
48 : * to replay SpGistPageAddNewItem() operations. If the offset points at an
49 : * existing tuple, it had better be a placeholder tuple.
50 : */
51 : static void
52 70991 : addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
53 : {
54 70991 : if (offset <= PageGetMaxOffsetNumber(page))
55 : {
56 19252 : SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
57 : PageGetItemId(page, offset));
58 :
59 19252 : if (dt->tupstate != SPGIST_PLACEHOLDER)
60 UBC 0 : elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
61 :
62 CBC 19252 : Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
63 19252 : SpGistPageGetOpaque(page)->nPlaceholder--;
64 :
65 19252 : PageIndexTupleDelete(page, offset);
66 : }
67 :
68 70991 : Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
69 :
70 70991 : if (PageAddItem(page, tuple, size, offset, false, false) != offset)
71 UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
72 : size);
73 CBC 70991 : }
74 :
75 : static void
76 38967 : spgRedoAddLeaf(XLogReaderState *record)
77 : {
78 38967 : XLogRecPtr lsn = record->EndRecPtr;
79 38967 : char *ptr = XLogRecGetData(record);
80 38967 : spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
81 : char *leafTuple;
82 : SpGistLeafTupleData leafTupleHdr;
83 : Buffer buffer;
84 : Page page;
85 : XLogRedoAction action;
86 :
87 38967 : ptr += sizeof(spgxlogAddLeaf);
88 38967 : leafTuple = ptr;
89 : /* the leaf tuple is unaligned, so make a copy to access its header */
90 38967 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
91 :
92 : /*
93 : * In normal operation we would have both current and parent pages locked
94 : * simultaneously; but in WAL replay it should be safe to update the leaf
95 : * page before updating the parent.
96 : */
97 38967 : if (xldata->newPage)
98 : {
99 UBC 0 : buffer = XLogInitBufferForRedo(record, 0);
100 0 : SpGistInitBuffer(buffer,
101 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
102 0 : action = BLK_NEEDS_REDO;
103 : }
104 : else
105 CBC 38967 : action = XLogReadBufferForRedo(record, 0, &buffer);
106 :
107 38967 : if (action == BLK_NEEDS_REDO)
108 : {
109 38952 : page = BufferGetPage(buffer);
110 :
111 : /* insert new tuple */
112 38952 : if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
113 : {
114 : /* normal cases, tuple was added by SpGistPageAddNewItem */
115 38952 : addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
116 38952 : xldata->offnumLeaf);
117 :
118 : /* update head tuple's chain link if needed */
119 38952 : if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
120 : {
121 : SpGistLeafTuple head;
122 :
123 38311 : head = (SpGistLeafTuple) PageGetItem(page,
124 38311 : PageGetItemId(page, xldata->offnumHeadLeaf));
125 38311 : Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
126 38311 : SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
127 : }
128 : }
129 : else
130 : {
131 : /* replacing a DEAD tuple */
132 UBC 0 : PageIndexTupleDelete(page, xldata->offnumLeaf);
133 0 : if (PageAddItem(page,
134 : (Item) leafTuple, leafTupleHdr.size,
135 0 : xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
136 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
137 : leafTupleHdr.size);
138 : }
139 :
140 CBC 38952 : PageSetLSN(page, lsn);
141 38952 : MarkBufferDirty(buffer);
142 : }
143 38967 : if (BufferIsValid(buffer))
144 38967 : UnlockReleaseBuffer(buffer);
145 :
146 : /* update parent downlink if necessary */
147 38967 : if (xldata->offnumParent != InvalidOffsetNumber)
148 : {
149 120 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
150 : {
151 : SpGistInnerTuple tuple;
152 : BlockNumber blknoLeaf;
153 :
154 120 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
155 :
156 120 : page = BufferGetPage(buffer);
157 :
158 120 : tuple = (SpGistInnerTuple) PageGetItem(page,
159 120 : PageGetItemId(page, xldata->offnumParent));
160 :
161 120 : spgUpdateNodeLink(tuple, xldata->nodeI,
162 120 : blknoLeaf, xldata->offnumLeaf);
163 :
164 120 : PageSetLSN(page, lsn);
165 120 : MarkBufferDirty(buffer);
166 : }
167 120 : if (BufferIsValid(buffer))
168 120 : UnlockReleaseBuffer(buffer);
169 : }
170 38967 : }
171 :
172 : static void
173 77 : spgRedoMoveLeafs(XLogReaderState *record)
174 : {
175 77 : XLogRecPtr lsn = record->EndRecPtr;
176 77 : char *ptr = XLogRecGetData(record);
177 77 : spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
178 : SpGistState state;
179 : OffsetNumber *toDelete;
180 : OffsetNumber *toInsert;
181 : int nInsert;
182 : Buffer buffer;
183 : Page page;
184 : XLogRedoAction action;
185 : BlockNumber blknoDst;
186 :
187 77 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
188 :
189 77 : fillFakeState(&state, xldata->stateSrc);
190 :
191 77 : nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
192 :
193 77 : ptr += SizeOfSpgxlogMoveLeafs;
194 77 : toDelete = (OffsetNumber *) ptr;
195 77 : ptr += sizeof(OffsetNumber) * xldata->nMoves;
196 77 : toInsert = (OffsetNumber *) ptr;
197 77 : ptr += sizeof(OffsetNumber) * nInsert;
198 :
199 : /* now ptr points to the list of leaf tuples */
200 :
201 : /*
202 : * In normal operation we would have all three pages (source, dest, and
203 : * parent) locked simultaneously; but in WAL replay it should be safe to
204 : * update them one at a time, as long as we do it in the right order.
205 : */
206 :
207 : /* Insert tuples on the dest page (do first, so redirect is valid) */
208 77 : if (xldata->newPage)
209 : {
210 32 : buffer = XLogInitBufferForRedo(record, 1);
211 32 : SpGistInitBuffer(buffer,
212 32 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
213 32 : action = BLK_NEEDS_REDO;
214 : }
215 : else
216 45 : action = XLogReadBufferForRedo(record, 1, &buffer);
217 :
218 77 : if (action == BLK_NEEDS_REDO)
219 : {
220 : int i;
221 :
222 77 : page = BufferGetPage(buffer);
223 :
224 3401 : for (i = 0; i < nInsert; i++)
225 : {
226 : char *leafTuple;
227 : SpGistLeafTupleData leafTupleHdr;
228 :
229 : /*
230 : * the tuples are not aligned, so must copy to access the size
231 : * field.
232 : */
233 3324 : leafTuple = ptr;
234 3324 : memcpy(&leafTupleHdr, leafTuple,
235 : sizeof(SpGistLeafTupleData));
236 :
237 3324 : addOrReplaceTuple(page, (Item) leafTuple,
238 3324 : leafTupleHdr.size, toInsert[i]);
239 3324 : ptr += leafTupleHdr.size;
240 : }
241 :
242 77 : PageSetLSN(page, lsn);
243 77 : MarkBufferDirty(buffer);
244 : }
245 77 : if (BufferIsValid(buffer))
246 77 : UnlockReleaseBuffer(buffer);
247 :
248 : /* Delete tuples from the source page, inserting a redirection pointer */
249 77 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
250 : {
251 77 : page = BufferGetPage(buffer);
252 :
253 77 : spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
254 77 : state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
255 : SPGIST_PLACEHOLDER,
256 : blknoDst,
257 77 : toInsert[nInsert - 1]);
258 :
259 77 : PageSetLSN(page, lsn);
260 77 : MarkBufferDirty(buffer);
261 : }
262 77 : if (BufferIsValid(buffer))
263 77 : UnlockReleaseBuffer(buffer);
264 :
265 : /* And update the parent downlink */
266 77 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
267 : {
268 : SpGistInnerTuple tuple;
269 :
270 77 : page = BufferGetPage(buffer);
271 :
272 77 : tuple = (SpGistInnerTuple) PageGetItem(page,
273 77 : PageGetItemId(page, xldata->offnumParent));
274 :
275 77 : spgUpdateNodeLink(tuple, xldata->nodeI,
276 77 : blknoDst, toInsert[nInsert - 1]);
277 :
278 77 : PageSetLSN(page, lsn);
279 77 : MarkBufferDirty(buffer);
280 : }
281 77 : if (BufferIsValid(buffer))
282 77 : UnlockReleaseBuffer(buffer);
283 77 : }
284 :
285 : static void
286 101 : spgRedoAddNode(XLogReaderState *record)
287 : {
288 101 : XLogRecPtr lsn = record->EndRecPtr;
289 101 : char *ptr = XLogRecGetData(record);
290 101 : spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
291 : char *innerTuple;
292 : SpGistInnerTupleData innerTupleHdr;
293 : SpGistState state;
294 : Buffer buffer;
295 : Page page;
296 : XLogRedoAction action;
297 :
298 101 : ptr += sizeof(spgxlogAddNode);
299 101 : innerTuple = ptr;
300 : /* the tuple is unaligned, so make a copy to access its header */
301 101 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
302 :
303 101 : fillFakeState(&state, xldata->stateSrc);
304 :
305 101 : if (!XLogRecHasBlockRef(record, 1))
306 : {
307 : /* update in place */
308 100 : Assert(xldata->parentBlk == -1);
309 100 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
310 : {
311 100 : page = BufferGetPage(buffer);
312 :
313 100 : PageIndexTupleDelete(page, xldata->offnum);
314 100 : if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
315 : xldata->offnum,
316 100 : false, false) != xldata->offnum)
317 UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
318 : innerTupleHdr.size);
319 :
320 CBC 100 : PageSetLSN(page, lsn);
321 100 : MarkBufferDirty(buffer);
322 : }
323 100 : if (BufferIsValid(buffer))
324 100 : UnlockReleaseBuffer(buffer);
325 : }
326 : else
327 : {
328 : BlockNumber blkno;
329 : BlockNumber blknoNew;
330 :
331 1 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
332 1 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
333 :
334 : /*
335 : * In normal operation we would have all three pages (source, dest,
336 : * and parent) locked simultaneously; but in WAL replay it should be
337 : * safe to update them one at a time, as long as we do it in the right
338 : * order. We must insert the new tuple before replacing the old tuple
339 : * with the redirect tuple.
340 : */
341 :
342 : /* Install new tuple first so redirect is valid */
343 1 : if (xldata->newPage)
344 : {
345 : /* AddNode is not used for nulls pages */
346 1 : buffer = XLogInitBufferForRedo(record, 1);
347 1 : SpGistInitBuffer(buffer, 0);
348 1 : action = BLK_NEEDS_REDO;
349 : }
350 : else
351 UBC 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
352 CBC 1 : if (action == BLK_NEEDS_REDO)
353 : {
354 1 : page = BufferGetPage(buffer);
355 :
356 1 : addOrReplaceTuple(page, (Item) innerTuple,
357 1 : innerTupleHdr.size, xldata->offnumNew);
358 :
359 : /*
360 : * If parent is in this same page, update it now.
361 : */
362 1 : if (xldata->parentBlk == 1)
363 : {
364 : SpGistInnerTuple parentTuple;
365 :
366 UBC 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
367 0 : PageGetItemId(page, xldata->offnumParent));
368 :
369 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
370 0 : blknoNew, xldata->offnumNew);
371 : }
372 CBC 1 : PageSetLSN(page, lsn);
373 1 : MarkBufferDirty(buffer);
374 : }
375 1 : if (BufferIsValid(buffer))
376 1 : UnlockReleaseBuffer(buffer);
377 :
378 : /* Delete old tuple, replacing it with redirect or placeholder tuple */
379 1 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
380 : {
381 : SpGistDeadTuple dt;
382 :
383 1 : page = BufferGetPage(buffer);
384 :
385 1 : if (state.isBuild)
386 UBC 0 : dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
387 : InvalidBlockNumber,
388 : InvalidOffsetNumber);
389 : else
390 CBC 1 : dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
391 : blknoNew,
392 1 : xldata->offnumNew);
393 :
394 1 : PageIndexTupleDelete(page, xldata->offnum);
395 1 : if (PageAddItem(page, (Item) dt, dt->size,
396 : xldata->offnum,
397 1 : false, false) != xldata->offnum)
398 UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
399 : dt->size);
400 :
401 CBC 1 : if (state.isBuild)
402 UBC 0 : SpGistPageGetOpaque(page)->nPlaceholder++;
403 : else
404 CBC 1 : SpGistPageGetOpaque(page)->nRedirection++;
405 :
406 : /*
407 : * If parent is in this same page, update it now.
408 : */
409 1 : if (xldata->parentBlk == 0)
410 : {
411 : SpGistInnerTuple parentTuple;
412 :
413 UBC 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
414 0 : PageGetItemId(page, xldata->offnumParent));
415 :
416 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
417 0 : blknoNew, xldata->offnumNew);
418 : }
419 CBC 1 : PageSetLSN(page, lsn);
420 1 : MarkBufferDirty(buffer);
421 : }
422 1 : if (BufferIsValid(buffer))
423 1 : UnlockReleaseBuffer(buffer);
424 :
425 : /*
426 : * Update parent downlink (if we didn't do it as part of the source or
427 : * destination page update already).
428 : */
429 1 : if (xldata->parentBlk == 2)
430 : {
431 1 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
432 : {
433 : SpGistInnerTuple parentTuple;
434 :
435 1 : page = BufferGetPage(buffer);
436 :
437 1 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
438 1 : PageGetItemId(page, xldata->offnumParent));
439 :
440 1 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
441 1 : blknoNew, xldata->offnumNew);
442 :
443 1 : PageSetLSN(page, lsn);
444 1 : MarkBufferDirty(buffer);
445 : }
446 1 : if (BufferIsValid(buffer))
447 1 : UnlockReleaseBuffer(buffer);
448 : }
449 : }
450 101 : }
451 :
452 : static void
453 101 : spgRedoSplitTuple(XLogReaderState *record)
454 : {
455 101 : XLogRecPtr lsn = record->EndRecPtr;
456 101 : char *ptr = XLogRecGetData(record);
457 101 : spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
458 : char *prefixTuple;
459 : SpGistInnerTupleData prefixTupleHdr;
460 : char *postfixTuple;
461 : SpGistInnerTupleData postfixTupleHdr;
462 : Buffer buffer;
463 : Page page;
464 : XLogRedoAction action;
465 :
466 101 : ptr += sizeof(spgxlogSplitTuple);
467 101 : prefixTuple = ptr;
468 : /* the prefix tuple is unaligned, so make a copy to access its header */
469 101 : memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
470 101 : ptr += prefixTupleHdr.size;
471 101 : postfixTuple = ptr;
472 : /* postfix tuple is also unaligned */
473 101 : memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
474 :
475 : /*
476 : * In normal operation we would have both pages locked simultaneously; but
477 : * in WAL replay it should be safe to update them one at a time, as long
478 : * as we do it in the right order.
479 : */
480 :
481 : /* insert postfix tuple first to avoid dangling link */
482 101 : if (!xldata->postfixBlkSame)
483 : {
484 27 : if (xldata->newPage)
485 : {
486 1 : buffer = XLogInitBufferForRedo(record, 1);
487 : /* SplitTuple is not used for nulls pages */
488 1 : SpGistInitBuffer(buffer, 0);
489 1 : action = BLK_NEEDS_REDO;
490 : }
491 : else
492 26 : action = XLogReadBufferForRedo(record, 1, &buffer);
493 27 : if (action == BLK_NEEDS_REDO)
494 : {
495 27 : page = BufferGetPage(buffer);
496 :
497 27 : addOrReplaceTuple(page, (Item) postfixTuple,
498 27 : postfixTupleHdr.size, xldata->offnumPostfix);
499 :
500 27 : PageSetLSN(page, lsn);
501 27 : MarkBufferDirty(buffer);
502 : }
503 27 : if (BufferIsValid(buffer))
504 27 : UnlockReleaseBuffer(buffer);
505 : }
506 :
507 : /* now handle the original page */
508 101 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
509 : {
510 101 : page = BufferGetPage(buffer);
511 :
512 101 : PageIndexTupleDelete(page, xldata->offnumPrefix);
513 101 : if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
514 101 : xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
515 UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
516 : prefixTupleHdr.size);
517 :
518 CBC 101 : if (xldata->postfixBlkSame)
519 74 : addOrReplaceTuple(page, (Item) postfixTuple,
520 74 : postfixTupleHdr.size,
521 74 : xldata->offnumPostfix);
522 :
523 101 : PageSetLSN(page, lsn);
524 101 : MarkBufferDirty(buffer);
525 : }
526 101 : if (BufferIsValid(buffer))
527 101 : UnlockReleaseBuffer(buffer);
528 101 : }
529 :
530 : static void
531 204 : spgRedoPickSplit(XLogReaderState *record)
532 : {
533 204 : XLogRecPtr lsn = record->EndRecPtr;
534 204 : char *ptr = XLogRecGetData(record);
535 204 : spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
536 : char *innerTuple;
537 : SpGistInnerTupleData innerTupleHdr;
538 : SpGistState state;
539 : OffsetNumber *toDelete;
540 : OffsetNumber *toInsert;
541 : uint8 *leafPageSelect;
542 : Buffer srcBuffer;
543 : Buffer destBuffer;
544 : Buffer innerBuffer;
545 : Page srcPage;
546 : Page destPage;
547 : Page page;
548 : int i;
549 : BlockNumber blknoInner;
550 : XLogRedoAction action;
551 :
552 204 : XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
553 :
554 204 : fillFakeState(&state, xldata->stateSrc);
555 :
556 204 : ptr += SizeOfSpgxlogPickSplit;
557 204 : toDelete = (OffsetNumber *) ptr;
558 204 : ptr += sizeof(OffsetNumber) * xldata->nDelete;
559 204 : toInsert = (OffsetNumber *) ptr;
560 204 : ptr += sizeof(OffsetNumber) * xldata->nInsert;
561 204 : leafPageSelect = (uint8 *) ptr;
562 204 : ptr += sizeof(uint8) * xldata->nInsert;
563 :
564 204 : innerTuple = ptr;
565 : /* the inner tuple is unaligned, so make a copy to access its header */
566 204 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
567 204 : ptr += innerTupleHdr.size;
568 :
569 : /* now ptr points to the list of leaf tuples */
570 :
571 204 : if (xldata->isRootSplit)
572 : {
573 : /* when splitting root, we touch it only in the guise of new inner */
574 3 : srcBuffer = InvalidBuffer;
575 3 : srcPage = NULL;
576 : }
577 201 : else if (xldata->initSrc)
578 : {
579 : /* just re-init the source page */
580 UBC 0 : srcBuffer = XLogInitBufferForRedo(record, 0);
581 0 : srcPage = (Page) BufferGetPage(srcBuffer);
582 :
583 0 : SpGistInitBuffer(srcBuffer,
584 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
585 : /* don't update LSN etc till we're done with it */
586 : }
587 : else
588 : {
589 : /*
590 : * Delete the specified tuples from source page. (In case we're in
591 : * Hot Standby, we need to hold lock on the page till we're done
592 : * inserting leaf tuples and the new inner tuple, else the added
593 : * redirect tuple will be a dangling link.)
594 : */
595 CBC 201 : srcPage = NULL;
596 201 : if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
597 : {
598 201 : srcPage = BufferGetPage(srcBuffer);
599 :
600 : /*
601 : * We have it a bit easier here than in doPickSplit(), because we
602 : * know the inner tuple's location already, so we can inject the
603 : * correct redirection tuple now.
604 : */
605 201 : if (!state.isBuild)
606 201 : spgPageIndexMultiDelete(&state, srcPage,
607 201 : toDelete, xldata->nDelete,
608 : SPGIST_REDIRECT,
609 : SPGIST_PLACEHOLDER,
610 : blknoInner,
611 201 : xldata->offnumInner);
612 : else
613 UBC 0 : spgPageIndexMultiDelete(&state, srcPage,
614 0 : toDelete, xldata->nDelete,
615 : SPGIST_PLACEHOLDER,
616 : SPGIST_PLACEHOLDER,
617 : InvalidBlockNumber,
618 : InvalidOffsetNumber);
619 :
620 : /* don't update LSN etc till we're done with it */
621 : }
622 : }
623 :
624 : /* try to access dest page if any */
625 CBC 204 : if (!XLogRecHasBlockRef(record, 1))
626 : {
627 UBC 0 : destBuffer = InvalidBuffer;
628 0 : destPage = NULL;
629 : }
630 CBC 204 : else if (xldata->initDest)
631 : {
632 : /* just re-init the dest page */
633 190 : destBuffer = XLogInitBufferForRedo(record, 1);
634 190 : destPage = (Page) BufferGetPage(destBuffer);
635 :
636 190 : SpGistInitBuffer(destBuffer,
637 190 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
638 : /* don't update LSN etc till we're done with it */
639 : }
640 : else
641 : {
642 : /*
643 : * We could probably release the page lock immediately in the
644 : * full-page-image case, but for safety let's hold it till later.
645 : */
646 14 : if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
647 14 : destPage = (Page) BufferGetPage(destBuffer);
648 : else
649 UBC 0 : destPage = NULL; /* don't do any page updates */
650 : }
651 :
652 : /* restore leaf tuples to src and/or dest page */
653 CBC 28614 : for (i = 0; i < xldata->nInsert; i++)
654 : {
655 : char *leafTuple;
656 : SpGistLeafTupleData leafTupleHdr;
657 :
658 : /* the tuples are not aligned, so must copy to access the size field. */
659 28410 : leafTuple = ptr;
660 28410 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
661 28410 : ptr += leafTupleHdr.size;
662 :
663 28410 : page = leafPageSelect[i] ? destPage : srcPage;
664 28410 : if (page == NULL)
665 UBC 0 : continue; /* no need to touch this page */
666 :
667 CBC 28410 : addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
668 28410 : toInsert[i]);
669 : }
670 :
671 : /* Now update src and dest page LSNs if needed */
672 204 : if (srcPage != NULL)
673 : {
674 201 : PageSetLSN(srcPage, lsn);
675 201 : MarkBufferDirty(srcBuffer);
676 : }
677 204 : if (destPage != NULL)
678 : {
679 204 : PageSetLSN(destPage, lsn);
680 204 : MarkBufferDirty(destBuffer);
681 : }
682 :
683 : /* restore new inner tuple */
684 204 : if (xldata->initInner)
685 : {
686 6 : innerBuffer = XLogInitBufferForRedo(record, 2);
687 6 : SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
688 6 : action = BLK_NEEDS_REDO;
689 : }
690 : else
691 198 : action = XLogReadBufferForRedo(record, 2, &innerBuffer);
692 :
693 204 : if (action == BLK_NEEDS_REDO)
694 : {
695 203 : page = BufferGetPage(innerBuffer);
696 :
697 203 : addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
698 203 : xldata->offnumInner);
699 :
700 : /* if inner is also parent, update link while we're here */
701 203 : if (xldata->innerIsParent)
702 : {
703 : SpGistInnerTuple parent;
704 :
705 185 : parent = (SpGistInnerTuple) PageGetItem(page,
706 185 : PageGetItemId(page, xldata->offnumParent));
707 185 : spgUpdateNodeLink(parent, xldata->nodeI,
708 185 : blknoInner, xldata->offnumInner);
709 : }
710 :
711 203 : PageSetLSN(page, lsn);
712 203 : MarkBufferDirty(innerBuffer);
713 : }
714 204 : if (BufferIsValid(innerBuffer))
715 204 : UnlockReleaseBuffer(innerBuffer);
716 :
717 : /*
718 : * Now we can release the leaf-page locks. It's okay to do this before
719 : * updating the parent downlink.
720 : */
721 204 : if (BufferIsValid(srcBuffer))
722 201 : UnlockReleaseBuffer(srcBuffer);
723 204 : if (BufferIsValid(destBuffer))
724 204 : UnlockReleaseBuffer(destBuffer);
725 :
726 : /* update parent downlink, unless we did it above */
727 204 : if (XLogRecHasBlockRef(record, 3))
728 15 : {
729 : Buffer parentBuffer;
730 :
731 15 : if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
732 : {
733 : SpGistInnerTuple parent;
734 :
735 15 : page = BufferGetPage(parentBuffer);
736 :
737 15 : parent = (SpGistInnerTuple) PageGetItem(page,
738 15 : PageGetItemId(page, xldata->offnumParent));
739 15 : spgUpdateNodeLink(parent, xldata->nodeI,
740 15 : blknoInner, xldata->offnumInner);
741 :
742 15 : PageSetLSN(page, lsn);
743 15 : MarkBufferDirty(parentBuffer);
744 : }
745 15 : if (BufferIsValid(parentBuffer))
746 15 : UnlockReleaseBuffer(parentBuffer);
747 : }
748 : else
749 189 : Assert(xldata->innerIsParent || xldata->isRootSplit);
750 204 : }
751 :
752 : static void
753 26 : spgRedoVacuumLeaf(XLogReaderState *record)
754 : {
755 26 : XLogRecPtr lsn = record->EndRecPtr;
756 26 : char *ptr = XLogRecGetData(record);
757 26 : spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
758 : OffsetNumber *toDead;
759 : OffsetNumber *toPlaceholder;
760 : OffsetNumber *moveSrc;
761 : OffsetNumber *moveDest;
762 : OffsetNumber *chainSrc;
763 : OffsetNumber *chainDest;
764 : SpGistState state;
765 : Buffer buffer;
766 : Page page;
767 : int i;
768 :
769 26 : fillFakeState(&state, xldata->stateSrc);
770 :
771 26 : ptr += SizeOfSpgxlogVacuumLeaf;
772 26 : toDead = (OffsetNumber *) ptr;
773 26 : ptr += sizeof(OffsetNumber) * xldata->nDead;
774 26 : toPlaceholder = (OffsetNumber *) ptr;
775 26 : ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
776 26 : moveSrc = (OffsetNumber *) ptr;
777 26 : ptr += sizeof(OffsetNumber) * xldata->nMove;
778 26 : moveDest = (OffsetNumber *) ptr;
779 26 : ptr += sizeof(OffsetNumber) * xldata->nMove;
780 26 : chainSrc = (OffsetNumber *) ptr;
781 26 : ptr += sizeof(OffsetNumber) * xldata->nChain;
782 26 : chainDest = (OffsetNumber *) ptr;
783 :
784 26 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
785 : {
786 26 : page = BufferGetPage(buffer);
787 :
788 26 : spgPageIndexMultiDelete(&state, page,
789 26 : toDead, xldata->nDead,
790 : SPGIST_DEAD, SPGIST_DEAD,
791 : InvalidBlockNumber,
792 : InvalidOffsetNumber);
793 :
794 26 : spgPageIndexMultiDelete(&state, page,
795 26 : toPlaceholder, xldata->nPlaceholder,
796 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
797 : InvalidBlockNumber,
798 : InvalidOffsetNumber);
799 :
800 : /* see comments in vacuumLeafPage() */
801 52 : for (i = 0; i < xldata->nMove; i++)
802 : {
803 26 : ItemId idSrc = PageGetItemId(page, moveSrc[i]);
804 26 : ItemId idDest = PageGetItemId(page, moveDest[i]);
805 : ItemIdData tmp;
806 :
807 26 : tmp = *idSrc;
808 26 : *idSrc = *idDest;
809 26 : *idDest = tmp;
810 : }
811 :
812 26 : spgPageIndexMultiDelete(&state, page,
813 26 : moveSrc, xldata->nMove,
814 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
815 : InvalidBlockNumber,
816 : InvalidOffsetNumber);
817 :
818 57 : for (i = 0; i < xldata->nChain; i++)
819 : {
820 : SpGistLeafTuple lt;
821 :
822 31 : lt = (SpGistLeafTuple) PageGetItem(page,
823 31 : PageGetItemId(page, chainSrc[i]));
824 31 : Assert(lt->tupstate == SPGIST_LIVE);
825 31 : SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
826 : }
827 :
828 26 : PageSetLSN(page, lsn);
829 26 : MarkBufferDirty(buffer);
830 : }
831 26 : if (BufferIsValid(buffer))
832 26 : UnlockReleaseBuffer(buffer);
833 26 : }
834 :
835 : static void
836 UBC 0 : spgRedoVacuumRoot(XLogReaderState *record)
837 : {
838 0 : XLogRecPtr lsn = record->EndRecPtr;
839 0 : char *ptr = XLogRecGetData(record);
840 0 : spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
841 : OffsetNumber *toDelete;
842 : Buffer buffer;
843 : Page page;
844 :
845 0 : toDelete = xldata->offsets;
846 :
847 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
848 : {
849 0 : page = BufferGetPage(buffer);
850 :
851 : /* The tuple numbers are in order */
852 0 : PageIndexMultiDelete(page, toDelete, xldata->nDelete);
853 :
854 0 : PageSetLSN(page, lsn);
855 0 : MarkBufferDirty(buffer);
856 : }
857 0 : if (BufferIsValid(buffer))
858 0 : UnlockReleaseBuffer(buffer);
859 0 : }
860 :
861 : static void
862 CBC 369 : spgRedoVacuumRedirect(XLogReaderState *record)
863 : {
864 369 : XLogRecPtr lsn = record->EndRecPtr;
865 369 : char *ptr = XLogRecGetData(record);
866 369 : spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
867 : OffsetNumber *itemToPlaceholder;
868 : Buffer buffer;
869 :
870 369 : itemToPlaceholder = xldata->offsets;
871 :
872 : /*
873 : * If any redirection tuples are being removed, make sure there are no
874 : * live Hot Standby transactions that might need to see them.
875 : */
876 369 : if (InHotStandby)
877 : {
878 : RelFileLocator locator;
879 ECB :
880 GNC 369 : XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
881 369 : ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
882 369 : xldata->isCatalogRel,
883 : locator);
884 ECB : }
885 :
886 CBC 369 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
887 ECB : {
888 GIC 161 : Page page = BufferGetPage(buffer);
889 161 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
890 : int i;
891 ECB :
892 : /* Convert redirect pointers to plain placeholders */
893 GIC 164 : for (i = 0; i < xldata->nToPlaceholder; i++)
894 : {
895 ECB : SpGistDeadTuple dt;
896 :
897 CBC 3 : dt = (SpGistDeadTuple) PageGetItem(page,
898 3 : PageGetItemId(page, itemToPlaceholder[i]));
899 3 : Assert(dt->tupstate == SPGIST_REDIRECT);
900 GIC 3 : dt->tupstate = SPGIST_PLACEHOLDER;
901 3 : ItemPointerSetInvalid(&dt->pointer);
902 ECB : }
903 :
904 CBC 161 : Assert(opaque->nRedirection >= xldata->nToPlaceholder);
905 GIC 161 : opaque->nRedirection -= xldata->nToPlaceholder;
906 161 : opaque->nPlaceholder += xldata->nToPlaceholder;
907 ECB :
908 : /* Remove placeholder tuples at end of page */
909 CBC 161 : if (xldata->firstPlaceholder != InvalidOffsetNumber)
910 : {
911 GIC 161 : int max = PageGetMaxOffsetNumber(page);
912 ECB : OffsetNumber *toDelete;
913 :
914 CBC 161 : toDelete = palloc(sizeof(OffsetNumber) * max);
915 ECB :
916 GIC 11051 : for (i = xldata->firstPlaceholder; i <= max; i++)
917 CBC 10890 : toDelete[i - xldata->firstPlaceholder] = i;
918 ECB :
919 CBC 161 : i = max - xldata->firstPlaceholder + 1;
920 GIC 161 : Assert(opaque->nPlaceholder >= i);
921 161 : opaque->nPlaceholder -= i;
922 ECB :
923 : /* The array is sorted, so can use PageIndexMultiDelete */
924 CBC 161 : PageIndexMultiDelete(page, toDelete, i);
925 :
926 GIC 161 : pfree(toDelete);
927 ECB : }
928 :
929 GIC 161 : PageSetLSN(page, lsn);
930 CBC 161 : MarkBufferDirty(buffer);
931 ECB : }
932 CBC 369 : if (BufferIsValid(buffer))
933 GIC 369 : UnlockReleaseBuffer(buffer);
934 369 : }
935 ECB :
936 : void
937 CBC 39845 : spg_redo(XLogReaderState *record)
938 : {
939 GIC 39845 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
940 ECB : MemoryContext oldCxt;
941 :
942 GIC 39845 : oldCxt = MemoryContextSwitchTo(opCtx);
943 CBC 39845 : switch (info)
944 ECB : {
945 CBC 38967 : case XLOG_SPGIST_ADD_LEAF:
946 38967 : spgRedoAddLeaf(record);
947 38967 : break;
948 77 : case XLOG_SPGIST_MOVE_LEAFS:
949 77 : spgRedoMoveLeafs(record);
950 77 : break;
951 101 : case XLOG_SPGIST_ADD_NODE:
952 101 : spgRedoAddNode(record);
953 101 : break;
954 101 : case XLOG_SPGIST_SPLIT_TUPLE:
955 101 : spgRedoSplitTuple(record);
956 101 : break;
957 204 : case XLOG_SPGIST_PICKSPLIT:
958 204 : spgRedoPickSplit(record);
959 204 : break;
960 26 : case XLOG_SPGIST_VACUUM_LEAF:
961 GBC 26 : spgRedoVacuumLeaf(record);
962 26 : break;
963 UBC 0 : case XLOG_SPGIST_VACUUM_ROOT:
964 LBC 0 : spgRedoVacuumRoot(record);
965 0 : break;
966 CBC 369 : case XLOG_SPGIST_VACUUM_REDIRECT:
967 GBC 369 : spgRedoVacuumRedirect(record);
968 369 : break;
969 UIC 0 : default:
970 0 : elog(PANIC, "spg_redo: unknown op code %u", info);
971 ECB : }
972 :
973 CBC 39845 : MemoryContextSwitchTo(oldCxt);
974 GIC 39845 : MemoryContextReset(opCtx);
975 39845 : }
976 ECB :
977 : void
978 CBC 141 : spg_xlog_startup(void)
979 : {
980 GIC 141 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
981 ECB : "SP-GiST temporary context",
982 : ALLOCSET_DEFAULT_SIZES);
983 GIC 141 : }
984 ECB :
985 : void
986 CBC 108 : spg_xlog_cleanup(void)
987 ECB : {
988 CBC 108 : MemoryContextDelete(opCtx);
989 GIC 108 : opCtx = NULL;
990 108 : }
991 :
992 : /*
993 : * Mask a SpGist page before performing consistency checks on it.
994 EUB : */
995 : void
996 UBC 0 : spg_mask(char *pagedata, BlockNumber blkno)
997 EUB : {
998 UIC 0 : Page page = (Page) pagedata;
999 UBC 0 : PageHeader pagehdr = (PageHeader) page;
1000 :
1001 0 : mask_page_lsn_and_checksum(page);
1002 :
1003 UIC 0 : mask_page_hint_bits(page);
1004 :
1005 : /*
1006 : * Mask the unused space, but only if the page's pd_lower appears to have
1007 EUB : * been set correctly.
1008 : */
1009 UBC 0 : if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1010 UIC 0 : mask_unused_space(page);
1011 0 : }
|