Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgdoinsert.c
4 : * implementation of insert algorithm
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgdoinsert.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/genam.h"
19 : #include "access/spgist_private.h"
20 : #include "access/spgxlog.h"
21 : #include "access/xloginsert.h"
22 : #include "common/pg_prng.h"
23 : #include "miscadmin.h"
24 : #include "storage/bufmgr.h"
25 : #include "utils/rel.h"
26 :
27 :
28 : /*
29 : * SPPageDesc tracks all info about a page we are inserting into. In some
30 : * situations it actually identifies a tuple, or even a specific node within
31 : * an inner tuple. But any of the fields can be invalid. If the buffer
32 : * field is valid, it implies we hold pin and exclusive lock on that buffer.
33 : * page pointer should be valid exactly when buffer is.
34 : */
35 : typedef struct SPPageDesc
36 : {
37 : BlockNumber blkno; /* block number, or InvalidBlockNumber */
38 : Buffer buffer; /* page's buffer number, or InvalidBuffer */
39 : Page page; /* pointer to page buffer, or NULL */
40 : OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
41 : int node; /* node number within inner tuple, or -1 */
42 : } SPPageDesc;
43 :
44 :
45 : /*
46 : * Set the item pointer in the nodeN'th entry in inner tuple tup. This
47 : * is used to update the parent inner tuple's downlink after a move or
48 : * split operation.
49 : */
50 : void
4129 tgl 51 CBC 5963 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
52 : BlockNumber blkno, OffsetNumber offset)
53 : {
54 : int i;
55 : SpGistNodeTuple node;
56 :
4131 57 30652 : SGITITERATE(tup, i, node)
58 : {
59 30652 : if (i == nodeN)
60 : {
61 5963 : ItemPointerSet(&node->t_tid, blkno, offset);
62 5963 : return;
63 : }
64 : }
65 :
4131 tgl 66 UBC 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
67 : nodeN);
68 : }
69 :
70 : /*
71 : * Form a new inner tuple containing one more node than the given one, with
72 : * the specified label datum, inserted at offset "offset" in the node array.
73 : * The new tuple's prefix is the same as the old one's.
74 : *
75 : * Note that the new node initially has an invalid downlink. We'll find a
76 : * page to point it to later.
77 : */
78 : static SpGistInnerTuple
4131 tgl 79 CBC 783 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
80 : {
81 : SpGistNodeTuple node,
82 : *nodes;
83 : int i;
84 :
85 : /* if offset is negative, insert at end */
86 783 : if (offset < 0)
4131 tgl 87 UBC 0 : offset = tuple->nNodes;
4131 tgl 88 CBC 783 : else if (offset > tuple->nNodes)
4131 tgl 89 UBC 0 : elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
90 :
4131 tgl 91 CBC 783 : nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
92 5036 : SGITITERATE(tuple, i, node)
93 : {
94 4253 : if (i < offset)
95 3646 : nodes[i] = node;
96 : else
97 607 : nodes[i + 1] = node;
98 : }
99 :
100 783 : nodes[offset] = spgFormNodeTuple(state, label, false);
101 :
102 783 : return spgFormInnerTuple(state,
103 783 : (tuple->prefixSize > 0),
104 368 : SGITDATUM(tuple, state),
105 783 : tuple->nNodes + 1,
106 : nodes);
107 : }
108 :
109 : /* qsort comparator for sorting OffsetNumbers */
110 : static int
111 2832902 : cmpOffsetNumbers(const void *a, const void *b)
112 : {
113 2832902 : if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
4131 tgl 114 UBC 0 : return 0;
4131 tgl 115 CBC 2832902 : return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
116 : }
117 :
118 : /*
119 : * Delete multiple tuples from an index page, preserving tuple offset numbers.
120 : *
121 : * The first tuple in the given list is replaced with a dead tuple of type
122 : * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
123 : * with dead tuples of type "reststate". If either firststate or reststate
124 : * is REDIRECT, blkno/offnum specify where to link to.
125 : *
126 : * NB: this is used during WAL replay, so beware of trying to make it too
127 : * smart. In particular, it shouldn't use "state" except for calling
128 : * spgFormDeadTuple(). This is also used in a critical section, so no
129 : * pallocs either!
130 : */
131 : void
132 4302 : spgPageIndexMultiDelete(SpGistState *state, Page page,
133 : OffsetNumber *itemnos, int nitems,
134 : int firststate, int reststate,
135 : BlockNumber blkno, OffsetNumber offnum)
136 : {
137 : OffsetNumber firstItem;
138 : OffsetNumber sortednos[MaxIndexTuplesPerPage];
139 4302 : SpGistDeadTuple tuple = NULL;
140 : int i;
141 :
142 4302 : if (nitems == 0)
143 104 : return; /* nothing to do */
144 :
145 : /*
146 : * For efficiency we want to use PageIndexMultiDelete, which requires the
147 : * targets to be listed in sorted order, so we have to sort the itemnos
148 : * array. (This also greatly simplifies the math for reinserting the
149 : * replacement tuples.) However, we must not scribble on the caller's
150 : * array, so we have to make a copy.
151 : */
152 4198 : memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
153 4198 : if (nitems > 1)
154 4109 : qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
155 :
156 4198 : PageIndexMultiDelete(page, sortednos, nitems);
157 :
158 4198 : firstItem = itemnos[0];
159 :
160 409536 : for (i = 0; i < nitems; i++)
161 : {
3955 bruce 162 405338 : OffsetNumber itemno = sortednos[i];
163 : int tupstate;
164 :
4131 tgl 165 405338 : tupstate = (itemno == firstItem) ? firststate : reststate;
166 405338 : if (tuple == NULL || tuple->tupstate != tupstate)
167 6487 : tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
168 :
169 405338 : if (PageAddItem(page, (Item) tuple, tuple->size,
170 : itemno, false, false) != itemno)
4131 tgl 171 UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
172 : tuple->size);
173 :
4131 tgl 174 CBC 405338 : if (tupstate == SPGIST_REDIRECT)
175 1179 : SpGistPageGetOpaque(page)->nRedirection++;
176 404159 : else if (tupstate == SPGIST_PLACEHOLDER)
177 404159 : SpGistPageGetOpaque(page)->nPlaceholder++;
178 : }
179 : }
180 :
181 : /*
182 : * Update the parent inner tuple's downlink, and mark the parent buffer
183 : * dirty (this must be the last change to the parent page in the current
184 : * WAL action).
185 : */
186 : static void
187 4923 : saveNodeLink(Relation index, SPPageDesc *parent,
188 : BlockNumber blkno, OffsetNumber offnum)
189 : {
190 : SpGistInnerTuple innerTuple;
191 :
192 4923 : innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
2118 193 4923 : PageGetItemId(parent->page, parent->offnum));
194 :
4129 195 4923 : spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
196 :
4131 197 4923 : MarkBufferDirty(parent->buffer);
198 4923 : }
199 :
200 : /*
201 : * Add a leaf tuple to a leaf page where there is known to be room for it
202 : */
203 : static void
204 398773 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
205 : SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
206 : {
207 : spgxlogAddLeaf xlrec;
208 :
209 398773 : xlrec.newPage = isNew;
4046 210 398773 : xlrec.storesNulls = isNulls;
211 :
212 : /* these will be filled below as needed */
4131 213 398773 : xlrec.offnumLeaf = InvalidOffsetNumber;
214 398773 : xlrec.offnumHeadLeaf = InvalidOffsetNumber;
215 398773 : xlrec.offnumParent = InvalidOffsetNumber;
216 398773 : xlrec.nodeI = 0;
217 :
218 398773 : START_CRIT_SECTION();
219 :
220 398773 : if (current->offnum == InvalidOffsetNumber ||
4046 221 397760 : SpGistBlockIsRoot(current->blkno))
222 : {
223 : /* Tuple is not part of a chain */
734 224 9494 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
4131 225 18988 : current->offnum = SpGistPageAddNewItem(state, current->page,
2118 226 9494 : (Item) leafTuple, leafTuple->size,
227 : NULL, false);
228 :
4131 229 9494 : xlrec.offnumLeaf = current->offnum;
230 :
231 : /* Must update parent's downlink if any */
232 9494 : if (parent->buffer != InvalidBuffer)
233 : {
234 1013 : xlrec.offnumParent = parent->offnum;
235 1013 : xlrec.nodeI = parent->node;
236 :
237 1013 : saveNodeLink(index, parent, current->blkno, current->offnum);
238 : }
239 : }
240 : else
241 : {
242 : /*
243 : * Tuple must be inserted into existing chain. We mustn't change the
244 : * chain's head address, but we don't need to chase the entire chain
245 : * to put the tuple at the end; we can insert it second.
246 : *
247 : * Also, it's possible that the "chain" consists only of a DEAD tuple,
248 : * in which case we should replace the DEAD tuple in-place.
249 : */
250 : SpGistLeafTuple head;
251 : OffsetNumber offnum;
252 :
253 389279 : head = (SpGistLeafTuple) PageGetItem(current->page,
2118 254 389279 : PageGetItemId(current->page, current->offnum));
4131 255 389279 : if (head->tupstate == SPGIST_LIVE)
256 : {
734 257 389279 : SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
4131 258 389279 : offnum = SpGistPageAddNewItem(state, current->page,
259 389279 : (Item) leafTuple, leafTuple->size,
260 : NULL, false);
261 :
262 : /*
263 : * re-get head of list because it could have been moved on page,
264 : * and set new second element
265 : */
266 389279 : head = (SpGistLeafTuple) PageGetItem(current->page,
2118 267 389279 : PageGetItemId(current->page, current->offnum));
734 268 389279 : SGLT_SET_NEXTOFFSET(head, offnum);
269 :
4131 270 389279 : xlrec.offnumLeaf = offnum;
271 389279 : xlrec.offnumHeadLeaf = current->offnum;
272 : }
4131 tgl 273 UBC 0 : else if (head->tupstate == SPGIST_DEAD)
274 : {
734 275 0 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
4131 276 0 : PageIndexTupleDelete(current->page, current->offnum);
277 0 : if (PageAddItem(current->page,
278 : (Item) leafTuple, leafTuple->size,
279 0 : current->offnum, false, false) != current->offnum)
280 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
281 : leafTuple->size);
282 :
283 : /* WAL replay distinguishes this case by equal offnums */
284 0 : xlrec.offnumLeaf = current->offnum;
285 0 : xlrec.offnumHeadLeaf = current->offnum;
286 : }
287 : else
288 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
289 : }
290 :
4131 tgl 291 CBC 398773 : MarkBufferDirty(current->buffer);
292 :
1467 heikki.linnakangas 293 398773 : if (RelationNeedsWAL(index) && !state->isBuild)
294 : {
295 : XLogRecPtr recptr;
296 : int flags;
297 :
3062 298 120190 : XLogBeginInsert();
299 120190 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
300 120190 : XLogRegisterData((char *) leafTuple, leafTuple->size);
301 :
2820 302 120190 : flags = REGBUF_STANDARD;
303 120190 : if (xlrec.newPage)
304 1 : flags |= REGBUF_WILL_INIT;
305 120190 : XLogRegisterBuffer(0, current->buffer, flags);
3062 306 120190 : if (xlrec.offnumParent != InvalidOffsetNumber)
307 415 : XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
308 :
309 120190 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
310 :
4131 tgl 311 120190 : PageSetLSN(current->page, recptr);
312 :
313 : /* update parent only if we actually changed it */
3062 heikki.linnakangas 314 120190 : if (xlrec.offnumParent != InvalidOffsetNumber)
315 : {
4131 tgl 316 415 : PageSetLSN(parent->page, recptr);
317 : }
318 : }
319 :
320 398773 : END_CRIT_SECTION();
321 398773 : }
322 :
323 : /*
324 : * Count the number and total size of leaf tuples in the chain starting at
325 : * current->offnum. Return number into *nToSplit and total size as function
326 : * result.
327 : *
328 : * Klugy special case when considering the root page (i.e., root is a leaf
329 : * page, but we're about to split for the first time): return fake large
330 : * values to force spgdoinsert() to take the doPickSplit rather than
331 : * moveLeafs code path. moveLeafs is not prepared to deal with root page.
332 : */
333 : static int
334 3948 : checkSplitConditions(Relation index, SpGistState *state,
335 : SPPageDesc *current, int *nToSplit)
336 : {
337 : int i,
338 3948 : n = 0,
339 3948 : totalSize = 0;
340 :
4046 341 3948 : if (SpGistBlockIsRoot(current->blkno))
342 : {
343 : /* return impossible values to force split */
4131 344 41 : *nToSplit = BLCKSZ;
345 41 : return BLCKSZ;
346 : }
347 :
348 3907 : i = current->offnum;
349 400149 : while (i != InvalidOffsetNumber)
350 : {
351 : SpGistLeafTuple it;
352 :
353 396242 : Assert(i >= FirstOffsetNumber &&
354 : i <= PageGetMaxOffsetNumber(current->page));
355 396242 : it = (SpGistLeafTuple) PageGetItem(current->page,
356 : PageGetItemId(current->page, i));
357 396242 : if (it->tupstate == SPGIST_LIVE)
358 : {
359 396242 : n++;
360 396242 : totalSize += it->size + sizeof(ItemIdData);
361 : }
4131 tgl 362 UBC 0 : else if (it->tupstate == SPGIST_DEAD)
363 : {
364 : /* We could see a DEAD tuple as first/only chain item */
365 0 : Assert(i == current->offnum);
734 366 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
367 : /* Don't count it in result, because it won't go to other page */
368 : }
369 : else
4131 370 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
371 :
734 tgl 372 CBC 396242 : i = SGLT_GET_NEXTOFFSET(it);
373 : }
374 :
4131 375 3907 : *nToSplit = n;
376 :
377 3907 : return totalSize;
378 : }
379 :
380 : /*
381 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
382 : * but the chain has to be moved because there's not enough room to add
383 : * newLeafTuple to its page. We use this method when the chain contains
384 : * very little data so a split would be inefficient. We are sure we can
385 : * fit the chain plus newLeafTuple on one other page.
386 : */
387 : static void
388 1108 : moveLeafs(Relation index, SpGistState *state,
389 : SPPageDesc *current, SPPageDesc *parent,
390 : SpGistLeafTuple newLeafTuple, bool isNulls)
391 : {
392 : int i,
393 : nDelete,
394 : nInsert,
395 : size;
396 : Buffer nbuf;
397 : Page npage;
398 1108 : OffsetNumber r = InvalidOffsetNumber,
399 1108 : startOffset = InvalidOffsetNumber;
4131 tgl 400 GIC 1108 : bool replaceDead = false;
401 : OffsetNumber *toDelete;
402 : OffsetNumber *toInsert;
403 : BlockNumber nblkno;
404 : spgxlogMoveLeafs xlrec;
405 : char *leafdata,
406 : *leafptr;
407 :
4131 tgl 408 ECB : /* This doesn't work on root page */
4131 tgl 409 CBC 1108 : Assert(parent->buffer != InvalidBuffer);
4131 tgl 410 GIC 1108 : Assert(parent->buffer != current->buffer);
411 :
4131 tgl 412 ECB : /* Locate the tuples to be moved, and count up the space needed */
4131 tgl 413 CBC 1108 : i = PageGetMaxOffsetNumber(current->page);
414 1108 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
4131 tgl 415 GIC 1108 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
4131 tgl 416 ECB :
4131 tgl 417 GIC 1108 : size = newLeafTuple->size + sizeof(ItemIdData);
4131 tgl 418 ECB :
4131 tgl 419 CBC 1108 : nDelete = 0;
420 1108 : i = current->offnum;
4131 tgl 421 GIC 45299 : while (i != InvalidOffsetNumber)
422 : {
423 : SpGistLeafTuple it;
4131 tgl 424 ECB :
4131 tgl 425 GIC 44191 : Assert(i >= FirstOffsetNumber &&
4131 tgl 426 ECB : i <= PageGetMaxOffsetNumber(current->page));
4131 tgl 427 GIC 44191 : it = (SpGistLeafTuple) PageGetItem(current->page,
428 : PageGetItemId(current->page, i));
4131 tgl 429 ECB :
4131 tgl 430 GIC 44191 : if (it->tupstate == SPGIST_LIVE)
4131 tgl 431 ECB : {
4131 tgl 432 CBC 44191 : toDelete[nDelete] = i;
433 44191 : size += it->size + sizeof(ItemIdData);
4131 tgl 434 GIC 44191 : nDelete++;
4131 tgl 435 EUB : }
4131 tgl 436 UIC 0 : else if (it->tupstate == SPGIST_DEAD)
437 : {
4131 tgl 438 EUB : /* We could see a DEAD tuple as first/only chain item */
4131 tgl 439 UBC 0 : Assert(i == current->offnum);
734 tgl 440 UIC 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
4131 tgl 441 EUB : /* We don't want to move it, so don't count it in size */
4131 tgl 442 UBC 0 : toDelete[nDelete] = i;
443 0 : nDelete++;
4131 tgl 444 UIC 0 : replaceDead = true;
445 : }
4131 tgl 446 EUB : else
4131 tgl 447 UIC 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
4131 tgl 448 ECB :
734 tgl 449 GIC 44191 : i = SGLT_GET_NEXTOFFSET(it);
450 : }
451 :
4131 tgl 452 ECB : /* Find a leaf page that will hold them */
4046 tgl 453 GIC 1108 : nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
4046 tgl 454 ECB : size, &xlrec.newPage);
2545 kgrittn 455 CBC 1108 : npage = BufferGetPage(nbuf);
4131 tgl 456 1108 : nblkno = BufferGetBlockNumber(nbuf);
4131 tgl 457 GIC 1108 : Assert(nblkno != current->blkno);
4131 tgl 458 ECB :
4131 tgl 459 GIC 1108 : leafdata = leafptr = palloc(size);
4131 tgl 460 ECB :
4131 tgl 461 GIC 1108 : START_CRIT_SECTION();
462 :
4131 tgl 463 ECB : /* copy all the old tuples to new page, unless they're dead */
4131 tgl 464 CBC 1108 : nInsert = 0;
4131 tgl 465 GIC 1108 : if (!replaceDead)
4131 tgl 466 ECB : {
4131 tgl 467 GIC 45299 : for (i = 0; i < nDelete; i++)
468 : {
469 : SpGistLeafTuple it;
470 :
471 44191 : it = (SpGistLeafTuple) PageGetItem(current->page,
2118 tgl 472 CBC 44191 : PageGetItemId(current->page, toDelete[i]));
4131 473 44191 : Assert(it->tupstate == SPGIST_LIVE);
4131 tgl 474 ECB :
475 : /*
476 : * Update chain link (notice the chain order gets reversed, but we
477 : * don't care). We're modifying the tuple on the source page
478 : * here, but it's okay since we're about to delete it.
479 : */
734 tgl 480 GIC 44191 : SGLT_SET_NEXTOFFSET(it, r);
4131 tgl 481 ECB :
4131 tgl 482 GIC 44191 : r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
4131 tgl 483 ECB : &startOffset, false);
484 :
4131 tgl 485 GIC 44191 : toInsert[nInsert] = r;
4131 tgl 486 CBC 44191 : nInsert++;
4131 tgl 487 ECB :
488 : /* save modified tuple into leafdata as well */
4131 tgl 489 GIC 44191 : memcpy(leafptr, it, it->size);
4131 tgl 490 CBC 44191 : leafptr += it->size;
4131 tgl 491 ECB : }
492 : }
493 :
494 : /* add the new tuple as well */
734 tgl 495 GIC 1108 : SGLT_SET_NEXTOFFSET(newLeafTuple, r);
4131 tgl 496 CBC 1108 : r = SpGistPageAddNewItem(state, npage,
497 1108 : (Item) newLeafTuple, newLeafTuple->size,
4131 tgl 498 ECB : &startOffset, false);
4131 tgl 499 GIC 1108 : toInsert[nInsert] = r;
4131 tgl 500 CBC 1108 : nInsert++;
501 1108 : memcpy(leafptr, newLeafTuple, newLeafTuple->size);
502 1108 : leafptr += newLeafTuple->size;
4131 tgl 503 ECB :
504 : /*
505 : * Now delete the old tuples, leaving a redirection pointer behind for the
506 : * first one, unless we're doing an index build; in which case there can't
507 : * be any concurrent scan so we need not provide a redirect.
508 : */
4131 tgl 509 GIC 1108 : spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
2118 tgl 510 CBC 1108 : state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
4131 tgl 511 ECB : SPGIST_PLACEHOLDER,
512 : nblkno, r);
513 :
514 : /* Update parent's downlink and mark parent page dirty */
4131 tgl 515 GIC 1108 : saveNodeLink(index, parent, nblkno, r);
4131 tgl 516 ECB :
517 : /* Mark the leaf pages too */
4131 tgl 518 GIC 1108 : MarkBufferDirty(current->buffer);
4131 tgl 519 CBC 1108 : MarkBufferDirty(nbuf);
4131 tgl 520 ECB :
1467 heikki.linnakangas 521 GIC 1108 : if (RelationNeedsWAL(index) && !state->isBuild)
4131 tgl 522 ECB : {
523 : XLogRecPtr recptr;
524 :
525 : /* prepare WAL info */
3062 heikki.linnakangas 526 GIC 262 : STORE_STATE(state, xlrec.stateSrc);
4131 tgl 527 ECB :
3062 heikki.linnakangas 528 GIC 262 : xlrec.nMoves = nDelete;
3062 heikki.linnakangas 529 CBC 262 : xlrec.replaceDead = replaceDead;
530 262 : xlrec.storesNulls = isNulls;
3062 heikki.linnakangas 531 ECB :
3062 heikki.linnakangas 532 GIC 262 : xlrec.offnumParent = parent->offnum;
3062 heikki.linnakangas 533 CBC 262 : xlrec.nodeI = parent->node;
3062 heikki.linnakangas 534 ECB :
3062 heikki.linnakangas 535 GIC 262 : XLogBeginInsert();
3062 heikki.linnakangas 536 CBC 262 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
537 262 : XLogRegisterData((char *) toDelete,
3062 heikki.linnakangas 538 ECB : sizeof(OffsetNumber) * nDelete);
3062 heikki.linnakangas 539 GIC 262 : XLogRegisterData((char *) toInsert,
3062 heikki.linnakangas 540 ECB : sizeof(OffsetNumber) * nInsert);
3062 heikki.linnakangas 541 GIC 262 : XLogRegisterData((char *) leafdata, leafptr - leafdata);
3062 heikki.linnakangas 542 ECB :
3062 heikki.linnakangas 543 GIC 262 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 544 CBC 262 : XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
545 262 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 546 ECB :
3062 heikki.linnakangas 547 GIC 262 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
4131 tgl 548 ECB :
4131 tgl 549 GIC 262 : PageSetLSN(current->page, recptr);
4131 tgl 550 CBC 262 : PageSetLSN(npage, recptr);
551 262 : PageSetLSN(parent->page, recptr);
4131 tgl 552 ECB : }
553 :
4131 tgl 554 GIC 1108 : END_CRIT_SECTION();
4131 tgl 555 ECB :
556 : /* Update local free-space cache and release new buffer */
4131 tgl 557 GIC 1108 : SpGistSetLastUsedPage(index, nbuf);
4131 tgl 558 CBC 1108 : UnlockReleaseBuffer(nbuf);
559 1108 : }
4131 tgl 560 ECB :
561 : /*
562 : * Update previously-created redirection tuple with appropriate destination
563 : *
564 : * We use this when it's not convenient to know the destination first.
565 : * The tuple should have been made with the "impossible" destination of
566 : * the metapage.
567 : */
568 : static void
4131 tgl 569 GIC 639 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
4131 tgl 570 ECB : BlockNumber blkno, OffsetNumber offnum)
571 : {
572 : SpGistDeadTuple dt;
573 :
4131 tgl 574 GIC 639 : dt = (SpGistDeadTuple) PageGetItem(current->page,
2118 tgl 575 ECB : PageGetItemId(current->page, position));
4131 tgl 576 GIC 639 : Assert(dt->tupstate == SPGIST_REDIRECT);
4131 tgl 577 CBC 639 : Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
578 639 : ItemPointerSet(&dt->pointer, blkno, offnum);
579 639 : }
4131 tgl 580 ECB :
581 : /*
582 : * Test to see if the user-defined picksplit function failed to do its job,
583 : * ie, it put all the leaf tuples into the same node.
584 : * If so, randomly divide the tuples into several nodes (all with the same
585 : * label) and return true to select allTheSame mode for this inner tuple.
586 : *
587 : * (This code is also used to forcibly select allTheSame mode for nulls.)
588 : *
589 : * If we know that the leaf tuples wouldn't all fit on one page, then we
590 : * exclude the last tuple (which is the incoming new tuple that forced a split)
591 : * from the check to see if more than one node is used. The reason for this
592 : * is that if the existing tuples are put into only one chain, then even if
593 : * we move them all to an empty page, there would still not be room for the
594 : * new tuple, so we'd get into an infinite loop of picksplit attempts.
595 : * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
596 : * be split across pages. (Exercise for the reader: figure out why this
597 : * fixes the problem even when there is only one old tuple.)
598 : */
599 : static bool
4131 tgl 600 GIC 2840 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
4131 tgl 601 ECB : bool *includeNew)
602 : {
603 : int theNode;
604 : int limit;
605 : int i;
606 :
607 : /* For the moment, assume we can include the new leaf tuple */
4131 tgl 608 GIC 2840 : *includeNew = true;
4131 tgl 609 ECB :
610 : /* If there's only the new leaf tuple, don't select allTheSame mode */
4131 tgl 611 GIC 2840 : if (in->nTuples <= 1)
4131 tgl 612 CBC 22 : return false;
4131 tgl 613 ECB :
614 : /* If tuple set doesn't fit on one page, ignore the new tuple in test */
4131 tgl 615 GIC 2818 : limit = tooBig ? in->nTuples - 1 : in->nTuples;
4131 tgl 616 ECB :
617 : /* Check to see if more than one node is populated */
4131 tgl 618 GIC 2818 : theNode = out->mapTuplesToNodes[0];
4131 tgl 619 CBC 63028 : for (i = 1; i < limit; i++)
4131 tgl 620 ECB : {
4131 tgl 621 GIC 62876 : if (out->mapTuplesToNodes[i] != theNode)
4131 tgl 622 CBC 2666 : return false;
4131 tgl 623 ECB : }
624 :
625 : /* Nope, so override the picksplit function's decisions */
626 :
627 : /* If the new tuple is in its own node, it can't be included in split */
4131 tgl 628 GIC 152 : if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
4131 tgl 629 LBC 0 : *includeNew = false;
4131 tgl 630 EUB :
4131 tgl 631 GIC 152 : out->nNodes = 8; /* arbitrary number of child nodes */
4131 tgl 632 ECB :
633 : /* Random assignment of tuples to nodes (note we include new tuple) */
4131 tgl 634 GIC 16118 : for (i = 0; i < in->nTuples; i++)
4131 tgl 635 CBC 15966 : out->mapTuplesToNodes[i] = i % out->nNodes;
4131 tgl 636 ECB :
637 : /* The opclass may not use node labels, but if it does, duplicate 'em */
4131 tgl 638 GIC 152 : if (out->nodeLabels)
4131 tgl 639 ECB : {
3955 bruce 640 GIC 26 : Datum theLabel = out->nodeLabels[theNode];
4131 tgl 641 ECB :
4131 tgl 642 GIC 26 : out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
4131 tgl 643 CBC 234 : for (i = 0; i < out->nNodes; i++)
644 208 : out->nodeLabels[i] = theLabel;
4131 tgl 645 ECB : }
646 :
647 : /* We don't touch the prefix or the leaf tuple datum assignments */
648 :
4131 tgl 649 GIC 152 : return true;
4131 tgl 650 ECB : }
651 :
652 : /*
653 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
654 : * but the chain has to be split because there's not enough room to add
655 : * newLeafTuple to its page.
656 : *
657 : * This function splits the leaf tuple set according to picksplit's rules,
658 : * creating one or more new chains that are spread across the current page
659 : * and an additional leaf page (we assume that two leaf pages will be
660 : * sufficient). A new inner tuple is created, and the parent downlink
661 : * pointer is updated to point to that inner tuple instead of the leaf chain.
662 : *
663 : * On exit, current contains the address of the new inner tuple.
664 : *
665 : * Returns true if we successfully inserted newLeafTuple during this function,
666 : * false if caller still has to do it (meaning another picksplit operation is
667 : * probably needed). Failure could occur if the picksplit result is fairly
668 : * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
669 : * Because we force the picksplit result to be at least two chains, each
670 : * cycle will get rid of at least one leaf tuple from the chain, so the loop
671 : * will eventually terminate if lack of balance is the issue. If the tuple
672 : * is too big, we assume that repeated picksplit operations will eventually
673 : * make it small enough by repeated prefix-stripping. A broken opclass could
674 : * make this an infinite loop, though, so spgdoinsert() checks that the
675 : * leaf datums get smaller each time.
676 : */
677 : static bool
4131 tgl 678 GIC 2840 : doPickSplit(Relation index, SpGistState *state,
4131 tgl 679 ECB : SPPageDesc *current, SPPageDesc *parent,
680 : SpGistLeafTuple newLeafTuple,
681 : int level, bool isNulls, bool isNew)
682 : {
4131 tgl 683 GIC 2840 : bool insertedNew = false;
4131 tgl 684 ECB : spgPickSplitIn in;
685 : spgPickSplitOut out;
686 : FmgrInfo *procinfo;
687 : bool includeNew;
688 : int i,
689 : max,
690 : n;
691 : SpGistInnerTuple innerTuple;
692 : SpGistNodeTuple node,
693 : *nodes;
694 : Buffer newInnerBuffer,
695 : newLeafBuffer;
696 : uint8 *leafPageSelect;
697 : int *leafSizes;
698 : OffsetNumber *toDelete;
699 : OffsetNumber *toInsert;
4131 tgl 700 GIC 2840 : OffsetNumber redirectTuplePos = InvalidOffsetNumber;
4131 tgl 701 ECB : OffsetNumber startOffsets[2];
702 : SpGistLeafTuple *oldLeafs;
703 : SpGistLeafTuple *newLeafs;
704 : Datum leafDatums[INDEX_MAX_KEYS];
705 : bool leafIsnulls[INDEX_MAX_KEYS];
706 : int spaceToDelete;
707 : int currentFreeSpace;
708 : int totalLeafSizes;
709 : bool allTheSame;
710 : spgxlogPickSplit xlrec;
711 : char *leafdata,
712 : *leafptr;
713 : SPPageDesc saveCurrent;
714 : int nToDelete,
715 : nToInsert,
716 : maxToInclude;
717 :
4131 tgl 718 GIC 2840 : in.level = level;
4131 tgl 719 ECB :
720 : /*
721 : * Allocate per-leaf-tuple work arrays with max possible size
722 : */
4131 tgl 723 GIC 2840 : max = PageGetMaxOffsetNumber(current->page);
4131 tgl 724 CBC 2840 : n = max + 1;
725 2840 : in.datums = (Datum *) palloc(sizeof(Datum) * n);
726 2840 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
727 2840 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
734 728 2840 : oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
4131 729 2840 : newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
730 2840 : leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
4131 tgl 731 ECB :
4131 tgl 732 GIC 2840 : STORE_STATE(state, xlrec.stateSrc);
4131 tgl 733 ECB :
734 : /*
735 : * Form list of leaf tuples which will be distributed as split result;
736 : * also, count up the amount of space that will be freed from current.
737 : * (Note that in the non-root case, we won't actually delete the old
738 : * tuples, only replace them with redirects or placeholders.)
739 : */
4131 tgl 740 GIC 2840 : nToInsert = 0;
4131 tgl 741 CBC 2840 : nToDelete = 0;
742 2840 : spaceToDelete = 0;
4046 743 2840 : if (SpGistBlockIsRoot(current->blkno))
4131 tgl 744 ECB : {
745 : /*
746 : * We are splitting the root (which up to now is also a leaf page).
747 : * Its tuples are not linked, so scan sequentially to get them all. We
748 : * ignore the original value of current->offnum.
749 : */
4131 tgl 750 GIC 7709 : for (i = FirstOffsetNumber; i <= max; i++)
4131 tgl 751 ECB : {
752 : SpGistLeafTuple it;
753 :
4131 tgl 754 GIC 7668 : it = (SpGistLeafTuple) PageGetItem(current->page,
2118 tgl 755 ECB : PageGetItemId(current->page, i));
4131 tgl 756 GIC 7668 : if (it->tupstate == SPGIST_LIVE)
4131 tgl 757 ECB : {
738 tgl 758 GIC 7668 : in.datums[nToInsert] =
738 tgl 759 CBC 7668 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
734 760 7668 : oldLeafs[nToInsert] = it;
4131 761 7668 : nToInsert++;
762 7668 : toDelete[nToDelete] = i;
763 7668 : nToDelete++;
4131 tgl 764 ECB : /* we will delete the tuple altogether, so count full space */
4131 tgl 765 GIC 7668 : spaceToDelete += it->size + sizeof(ItemIdData);
4131 tgl 766 ECB : }
767 : else /* tuples on root should be live */
4131 tgl 768 UIC 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
4131 tgl 769 EUB : }
770 : }
771 : else
772 : {
773 : /* Normal case, just collect the leaf tuples in the chain */
4131 tgl 774 GIC 2799 : i = current->offnum;
4131 tgl 775 CBC 354850 : while (i != InvalidOffsetNumber)
4131 tgl 776 ECB : {
777 : SpGistLeafTuple it;
778 :
4131 tgl 779 GIC 352051 : Assert(i >= FirstOffsetNumber && i <= max);
4131 tgl 780 CBC 352051 : it = (SpGistLeafTuple) PageGetItem(current->page,
2118 tgl 781 ECB : PageGetItemId(current->page, i));
4131 tgl 782 GIC 352051 : if (it->tupstate == SPGIST_LIVE)
4131 tgl 783 ECB : {
738 tgl 784 GIC 352051 : in.datums[nToInsert] =
738 tgl 785 CBC 352051 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
734 786 352051 : oldLeafs[nToInsert] = it;
4131 787 352051 : nToInsert++;
788 352051 : toDelete[nToDelete] = i;
789 352051 : nToDelete++;
4131 tgl 790 ECB : /* we will not delete the tuple, only replace with dead */
4131 tgl 791 GIC 352051 : Assert(it->size >= SGDTSIZE);
4131 tgl 792 CBC 352051 : spaceToDelete += it->size - SGDTSIZE;
4131 tgl 793 ECB : }
4131 tgl 794 UIC 0 : else if (it->tupstate == SPGIST_DEAD)
4131 tgl 795 EUB : {
796 : /* We could see a DEAD tuple as first/only chain item */
4131 tgl 797 UIC 0 : Assert(i == current->offnum);
734 tgl 798 UBC 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
4131 799 0 : toDelete[nToDelete] = i;
800 0 : nToDelete++;
4131 tgl 801 EUB : /* replacing it with redirect will save no space */
802 : }
803 : else
4131 tgl 804 UIC 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
4131 tgl 805 EUB :
734 tgl 806 GIC 352051 : i = SGLT_GET_NEXTOFFSET(it);
4131 tgl 807 ECB : }
808 : }
4131 tgl 809 GIC 2840 : in.nTuples = nToInsert;
4131 tgl 810 ECB :
811 : /*
812 : * We may not actually insert new tuple because another picksplit may be
813 : * necessary due to too large value, but we will try to allocate enough
814 : * space to include it; and in any case it has to be included in the input
815 : * for the picksplit function. So don't increment nToInsert yet.
816 : */
738 tgl 817 GIC 2840 : in.datums[in.nTuples] =
738 tgl 818 CBC 2840 : isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
734 819 2840 : oldLeafs[in.nTuples] = newLeafTuple;
4131 820 2840 : in.nTuples++;
4131 tgl 821 ECB :
4131 tgl 822 GIC 2840 : memset(&out, 0, sizeof(out));
4131 tgl 823 ECB :
4046 tgl 824 GIC 2840 : if (!isNulls)
4046 tgl 825 ECB : {
826 : /*
827 : * Perform split using user-defined method.
828 : */
4046 tgl 829 GIC 2840 : procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
4046 tgl 830 CBC 2840 : FunctionCall2Coll(procinfo,
831 2840 : index->rd_indcollation[0],
4046 tgl 832 ECB : PointerGetDatum(&in),
833 : PointerGetDatum(&out));
834 :
835 : /*
836 : * Form new leaf tuples and count up the total space needed.
837 : */
4046 tgl 838 GIC 2840 : totalLeafSizes = 0;
4046 tgl 839 CBC 365399 : for (i = 0; i < in.nTuples; i++)
4046 tgl 840 ECB : {
734 tgl 841 GIC 362559 : if (state->leafTupDesc->natts > 1)
734 tgl 842 CBC 32206 : spgDeformLeafTuple(oldLeafs[i],
734 tgl 843 ECB : state->leafTupDesc,
844 : leafDatums,
845 : leafIsnulls,
846 : isNulls);
847 :
734 tgl 848 GIC 362559 : leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
734 tgl 849 CBC 362559 : leafIsnulls[spgKeyColumn] = false;
734 tgl 850 ECB :
734 tgl 851 GIC 362559 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
734 tgl 852 ECB : leafDatums,
853 : leafIsnulls);
4046 tgl 854 GIC 362559 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
4046 tgl 855 ECB : }
856 : }
857 : else
858 : {
859 : /*
860 : * Perform dummy split that puts all tuples into one node.
861 : * checkAllTheSame will override this and force allTheSame mode.
862 : */
4046 tgl 863 UIC 0 : out.hasPrefix = false;
4046 tgl 864 UBC 0 : out.nNodes = 1;
865 0 : out.nodeLabels = NULL;
866 0 : out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
4046 tgl 867 EUB :
868 : /*
869 : * Form new leaf tuples and count up the total space needed.
870 : */
4046 tgl 871 UIC 0 : totalLeafSizes = 0;
4046 tgl 872 UBC 0 : for (i = 0; i < in.nTuples; i++)
4046 tgl 873 EUB : {
734 tgl 874 UIC 0 : if (state->leafTupDesc->natts > 1)
734 tgl 875 UBC 0 : spgDeformLeafTuple(oldLeafs[i],
734 tgl 876 EUB : state->leafTupDesc,
877 : leafDatums,
878 : leafIsnulls,
879 : isNulls);
880 :
881 : /*
882 : * Nulls tree can contain only null key values.
883 : */
734 tgl 884 UIC 0 : leafDatums[spgKeyColumn] = (Datum) 0;
734 tgl 885 UBC 0 : leafIsnulls[spgKeyColumn] = true;
734 tgl 886 EUB :
734 tgl 887 UIC 0 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
734 tgl 888 EUB : leafDatums,
889 : leafIsnulls);
4046 tgl 890 UIC 0 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
4046 tgl 891 EUB : }
892 : }
893 :
894 : /*
895 : * Check to see if the picksplit function failed to separate the values,
896 : * ie, it put them all into the same child node. If so, select allTheSame
897 : * mode and create a random split instead. See comments for
898 : * checkAllTheSame as to why we need to know if the new leaf tuples could
899 : * fit on one page.
900 : */
4131 tgl 901 GIC 2840 : allTheSame = checkAllTheSame(&in, &out,
4131 tgl 902 ECB : totalLeafSizes > SPGIST_PAGE_CAPACITY,
903 : &includeNew);
904 :
905 : /*
906 : * If checkAllTheSame decided we must exclude the new tuple, don't
907 : * consider it any further.
908 : */
4131 tgl 909 GIC 2840 : if (includeNew)
4131 tgl 910 CBC 2840 : maxToInclude = in.nTuples;
4131 tgl 911 ECB : else
912 : {
4131 tgl 913 UIC 0 : maxToInclude = in.nTuples - 1;
4131 tgl 914 UBC 0 : totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
4131 tgl 915 EUB : }
916 :
917 : /*
918 : * Allocate per-node work arrays. Since checkAllTheSame could replace
919 : * out.nNodes with a value larger than the number of tuples on the input
920 : * page, we can't allocate these arrays before here.
921 : */
4131 tgl 922 GIC 2840 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
4131 tgl 923 CBC 2840 : leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
4131 tgl 924 ECB :
925 : /*
926 : * Form nodes of inner tuple and inner tuple itself
927 : */
4131 tgl 928 GIC 22035 : for (i = 0; i < out.nNodes; i++)
4131 tgl 929 ECB : {
4131 tgl 930 GIC 19195 : Datum label = (Datum) 0;
4046 tgl 931 CBC 19195 : bool labelisnull = (out.nodeLabels == NULL);
4131 tgl 932 ECB :
4046 tgl 933 GIC 19195 : if (!labelisnull)
4131 tgl 934 CBC 1791 : label = out.nodeLabels[i];
4046 935 19195 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
4131 tgl 936 ECB : }
4131 tgl 937 GIC 2840 : innerTuple = spgFormInnerTuple(state,
4131 tgl 938 CBC 2840 : out.hasPrefix, out.prefixDatum,
4131 tgl 939 ECB : out.nNodes, nodes);
4131 tgl 940 GIC 2840 : innerTuple->allTheSame = allTheSame;
4131 tgl 941 ECB :
942 : /*
943 : * Update nodes[] array to point into the newly formed innerTuple, so that
944 : * we can adjust their downlinks below.
945 : */
4131 tgl 946 GIC 22035 : SGITITERATE(innerTuple, i, node)
4131 tgl 947 ECB : {
4131 tgl 948 GIC 19195 : nodes[i] = node;
4131 tgl 949 ECB : }
950 :
951 : /*
952 : * Re-scan new leaf tuples and count up the space needed under each node.
953 : */
4131 tgl 954 GIC 365399 : for (i = 0; i < maxToInclude; i++)
4131 tgl 955 ECB : {
4131 tgl 956 GIC 362559 : n = out.mapTuplesToNodes[i];
4131 tgl 957 CBC 362559 : if (n < 0 || n >= out.nNodes)
4131 tgl 958 LBC 0 : elog(ERROR, "inconsistent result of SPGiST picksplit function");
4131 tgl 959 GBC 362559 : leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
4131 tgl 960 ECB : }
961 :
962 : /*
963 : * To perform the split, we must insert a new inner tuple, which can't go
964 : * on a leaf page; and unless we are splitting the root page, we must then
965 : * update the parent tuple's downlink to point to the inner tuple. If
966 : * there is room, we'll put the new inner tuple on the same page as the
967 : * parent tuple, otherwise we need another non-leaf buffer. But if the
968 : * parent page is the root, we can't add the new inner tuple there,
969 : * because the root page must have only one inner tuple.
970 : */
4131 tgl 971 GIC 2840 : xlrec.initInner = false;
4131 tgl 972 CBC 2840 : if (parent->buffer != InvalidBuffer &&
4046 973 2799 : !SpGistBlockIsRoot(parent->blkno) &&
4131 974 2619 : (SpGistPageGetFreeSpace(parent->page, 1) >=
975 2619 : innerTuple->size + sizeof(ItemIdData)))
4131 tgl 976 ECB : {
977 : /* New inner tuple will fit on parent page */
4131 tgl 978 GIC 2415 : newInnerBuffer = parent->buffer;
4131 tgl 979 ECB : }
4131 tgl 980 GIC 425 : else if (parent->buffer != InvalidBuffer)
4131 tgl 981 ECB : {
982 : /* Send tuple to page with next triple parity (see README) */
4131 tgl 983 GIC 768 : newInnerBuffer = SpGistGetBuffer(index,
2118 tgl 984 CBC 384 : GBUF_INNER_PARITY(parent->blkno + 1) |
4046 985 384 : (isNulls ? GBUF_NULLS : 0),
2118 986 384 : innerTuple->size + sizeof(ItemIdData),
4131 tgl 987 ECB : &xlrec.initInner);
988 : }
989 : else
990 : {
991 : /* Root page split ... inner tuple will go to root page */
4131 tgl 992 GIC 41 : newInnerBuffer = InvalidBuffer;
4131 tgl 993 ECB : }
994 :
995 : /*
996 : * The new leaf tuples converted from the existing ones should require the
997 : * same or less space, and therefore should all fit onto one page
998 : * (although that's not necessarily the current page, since we can't
999 : * delete the old tuples but only replace them with placeholders).
1000 : * However, the incoming new tuple might not also fit, in which case we
1001 : * might need another picksplit cycle to reduce it some more.
1002 : *
1003 : * If there's not room to put everything back onto the current page, then
1004 : * we decide on a per-node basis which tuples go to the new page. (We do
1005 : * it like that because leaf tuple chains can't cross pages, so we must
1006 : * place all leaf tuples belonging to the same parent node on the same
1007 : * page.)
1008 : *
1009 : * If we are splitting the root page (turning it from a leaf page into an
1010 : * inner page), then no leaf tuples can go back to the current page; they
1011 : * must all go somewhere else.
1012 : */
4046 tgl 1013 GIC 2840 : if (!SpGistBlockIsRoot(current->blkno))
4131 tgl 1014 CBC 2799 : currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
4131 tgl 1015 ECB : else
4131 tgl 1016 GIC 41 : currentFreeSpace = 0; /* prevent assigning any tuples to current */
4131 tgl 1017 ECB :
4131 tgl 1018 GIC 2840 : xlrec.initDest = false;
4131 tgl 1019 ECB :
4131 tgl 1020 GIC 2840 : if (totalLeafSizes <= currentFreeSpace)
4131 tgl 1021 ECB : {
1022 : /* All the leaf tuples will fit on current page */
4131 tgl 1023 GIC 12 : newLeafBuffer = InvalidBuffer;
4131 tgl 1024 ECB : /* mark new leaf tuple as included in insertions, if allowed */
4131 tgl 1025 GIC 12 : if (includeNew)
4131 tgl 1026 ECB : {
4131 tgl 1027 GIC 12 : nToInsert++;
4131 tgl 1028 CBC 12 : insertedNew = true;
4131 tgl 1029 ECB : }
4131 tgl 1030 GIC 1485 : for (i = 0; i < nToInsert; i++)
2118 tgl 1031 CBC 1473 : leafPageSelect[i] = 0; /* signifies current page */
4131 tgl 1032 ECB : }
4131 tgl 1033 GIC 2828 : else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
4131 tgl 1034 ECB : {
1035 : /*
1036 : * We're trying to split up a long value by repeated suffixing, but
1037 : * it's not going to fit yet. Don't bother allocating a second leaf
1038 : * buffer that we won't be able to use.
1039 : */
4131 tgl 1040 GIC 22 : newLeafBuffer = InvalidBuffer;
4131 tgl 1041 CBC 22 : Assert(includeNew);
1042 22 : Assert(nToInsert == 0);
4131 tgl 1043 ECB : }
1044 : else
1045 : {
1046 : /* We will need another leaf page */
1047 : uint8 *nodePageSelect;
1048 : int curspace;
1049 : int newspace;
1050 :
4046 tgl 1051 GIC 2806 : newLeafBuffer = SpGistGetBuffer(index,
2118 tgl 1052 ECB : GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
4131 tgl 1053 GIC 2806 : Min(totalLeafSizes,
4131 tgl 1054 ECB : SPGIST_PAGE_CAPACITY),
1055 : &xlrec.initDest);
1056 :
1057 : /*
1058 : * Attempt to assign node groups to the two pages. We might fail to
1059 : * do so, even if totalLeafSizes is less than the available space,
1060 : * because we can't split a group across pages.
1061 : */
4131 tgl 1062 GIC 2806 : nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
4131 tgl 1063 ECB :
4131 tgl 1064 GIC 2806 : curspace = currentFreeSpace;
2545 kgrittn 1065 CBC 2806 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
4131 tgl 1066 21955 : for (i = 0; i < out.nNodes; i++)
4131 tgl 1067 ECB : {
4131 tgl 1068 GIC 19149 : if (leafSizes[i] <= curspace)
4131 tgl 1069 ECB : {
3955 bruce 1070 GIC 12430 : nodePageSelect[i] = 0; /* signifies current page */
4131 tgl 1071 CBC 12430 : curspace -= leafSizes[i];
4131 tgl 1072 ECB : }
1073 : else
1074 : {
3955 bruce 1075 GIC 6719 : nodePageSelect[i] = 1; /* signifies new leaf page */
4131 tgl 1076 CBC 6719 : newspace -= leafSizes[i];
4131 tgl 1077 ECB : }
1078 : }
4131 tgl 1079 GIC 2806 : if (curspace >= 0 && newspace >= 0)
4131 tgl 1080 ECB : {
1081 : /* Successful assignment, so we can include the new leaf tuple */
4131 tgl 1082 GIC 2691 : if (includeNew)
4131 tgl 1083 ECB : {
4131 tgl 1084 GIC 2691 : nToInsert++;
4131 tgl 1085 CBC 2691 : insertedNew = true;
4131 tgl 1086 ECB : }
1087 : }
4131 tgl 1088 GIC 115 : else if (includeNew)
4131 tgl 1089 ECB : {
1090 : /* We must exclude the new leaf tuple from the split */
3955 bruce 1091 GIC 115 : int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
4131 tgl 1092 ECB :
4131 tgl 1093 GIC 115 : leafSizes[nodeOfNewTuple] -=
4131 tgl 1094 CBC 115 : newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
4131 tgl 1095 ECB :
1096 : /* Repeat the node assignment process --- should succeed now */
4131 tgl 1097 GIC 115 : curspace = currentFreeSpace;
2545 kgrittn 1098 CBC 115 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
4131 tgl 1099 557 : for (i = 0; i < out.nNodes; i++)
4131 tgl 1100 ECB : {
4131 tgl 1101 GIC 442 : if (leafSizes[i] <= curspace)
4131 tgl 1102 ECB : {
2118 tgl 1103 GIC 142 : nodePageSelect[i] = 0; /* signifies current page */
4131 tgl 1104 CBC 142 : curspace -= leafSizes[i];
4131 tgl 1105 ECB : }
1106 : else
1107 : {
2118 tgl 1108 GIC 300 : nodePageSelect[i] = 1; /* signifies new leaf page */
4131 tgl 1109 CBC 300 : newspace -= leafSizes[i];
4131 tgl 1110 ECB : }
1111 : }
4131 tgl 1112 GIC 115 : if (curspace < 0 || newspace < 0)
4131 tgl 1113 LBC 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
4131 tgl 1114 EUB : }
1115 : else
1116 : {
1117 : /* oops, we already excluded new tuple ... should not get here */
4131 tgl 1118 UIC 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
4131 tgl 1119 EUB : }
1120 : /* Expand the per-node assignments to be shown per leaf tuple */
4131 tgl 1121 GIC 363755 : for (i = 0; i < nToInsert; i++)
4131 tgl 1122 ECB : {
4131 tgl 1123 GIC 360949 : n = out.mapTuplesToNodes[i];
4131 tgl 1124 CBC 360949 : leafPageSelect[i] = nodePageSelect[n];
4131 tgl 1125 ECB : }
1126 : }
1127 :
1128 : /* Start preparing WAL record */
4131 tgl 1129 GIC 2840 : xlrec.nDelete = 0;
4131 tgl 1130 CBC 2840 : xlrec.initSrc = isNew;
4046 1131 2840 : xlrec.storesNulls = isNulls;
3062 heikki.linnakangas 1132 2840 : xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
4131 tgl 1133 ECB :
4131 tgl 1134 GIC 2840 : leafdata = leafptr = (char *) palloc(totalLeafSizes);
4131 tgl 1135 ECB :
1136 : /* Here we begin making the changes to the target pages */
4131 tgl 1137 GIC 2840 : START_CRIT_SECTION();
4131 tgl 1138 ECB :
1139 : /*
1140 : * Delete old leaf tuples from current buffer, except when we're splitting
1141 : * the root; in that case there's no need because we'll re-init the page
1142 : * below. We do this first to make room for reinserting new leaf tuples.
1143 : */
4046 tgl 1144 GIC 2840 : if (!SpGistBlockIsRoot(current->blkno))
4131 tgl 1145 ECB : {
1146 : /*
1147 : * Init buffer instead of deleting individual tuples, but only if
1148 : * there aren't any other live tuples and only during build; otherwise
1149 : * we need to set a redirection tuple for concurrent scans.
1150 : */
4131 tgl 1151 GIC 2799 : if (state->isBuild &&
4131 tgl 1152 CBC 2138 : nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1153 2138 : PageGetMaxOffsetNumber(current->page))
4131 tgl 1154 ECB : {
4046 tgl 1155 GIC 155 : SpGistInitBuffer(current->buffer,
4046 tgl 1156 ECB : SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
4131 tgl 1157 GIC 155 : xlrec.initSrc = true;
4131 tgl 1158 ECB : }
4131 tgl 1159 GIC 2644 : else if (isNew)
4131 tgl 1160 ECB : {
1161 : /* don't expose the freshly init'd buffer as a backup block */
4131 tgl 1162 GIC 22 : Assert(nToDelete == 0);
4131 tgl 1163 ECB : }
1164 : else
1165 : {
4131 tgl 1166 GIC 2622 : xlrec.nDelete = nToDelete;
4131 tgl 1167 ECB :
4131 tgl 1168 GIC 2622 : if (!state->isBuild)
4131 tgl 1169 ECB : {
1170 : /*
1171 : * Need to create redirect tuple (it will point to new inner
1172 : * tuple) but right now the new tuple's location is not known
1173 : * yet. So, set the redirection pointer to "impossible" value
1174 : * and remember its position to update tuple later.
1175 : */
4131 tgl 1176 GIC 639 : if (nToDelete > 0)
4131 tgl 1177 CBC 639 : redirectTuplePos = toDelete[0];
1178 639 : spgPageIndexMultiDelete(state, current->page,
4131 tgl 1179 ECB : toDelete, nToDelete,
1180 : SPGIST_REDIRECT,
1181 : SPGIST_PLACEHOLDER,
1182 : SPGIST_METAPAGE_BLKNO,
1183 : FirstOffsetNumber);
1184 : }
1185 : else
1186 : {
1187 : /*
1188 : * During index build there is not concurrent searches, so we
1189 : * don't need to create redirection tuple.
1190 : */
4131 tgl 1191 GIC 1983 : spgPageIndexMultiDelete(state, current->page,
4131 tgl 1192 ECB : toDelete, nToDelete,
1193 : SPGIST_PLACEHOLDER,
1194 : SPGIST_PLACEHOLDER,
1195 : InvalidBlockNumber,
1196 : InvalidOffsetNumber);
1197 : }
1198 : }
1199 : }
1200 :
1201 : /*
1202 : * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1203 : * nodes.
1204 : */
4131 tgl 1205 GIC 2840 : startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
4131 tgl 1206 CBC 365262 : for (i = 0; i < nToInsert; i++)
4131 tgl 1207 ECB : {
4131 tgl 1208 GIC 362422 : SpGistLeafTuple it = newLeafs[i];
3955 bruce 1209 ECB : Buffer leafBuffer;
1210 : BlockNumber leafBlock;
1211 : OffsetNumber newoffset;
1212 :
1213 : /* Which page is it going to? */
4131 tgl 1214 GIC 362422 : leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
4131 tgl 1215 CBC 362422 : leafBlock = BufferGetBlockNumber(leafBuffer);
4131 tgl 1216 ECB :
1217 : /* Link tuple into correct chain for its node */
4131 tgl 1218 GIC 362422 : n = out.mapTuplesToNodes[i];
4131 tgl 1219 ECB :
4131 tgl 1220 GIC 362422 : if (ItemPointerIsValid(&nodes[n]->t_tid))
4131 tgl 1221 ECB : {
4131 tgl 1222 GIC 351516 : Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
734 tgl 1223 CBC 351516 : SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
4131 tgl 1224 ECB : }
1225 : else
734 tgl 1226 GIC 10906 : SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
4131 tgl 1227 ECB :
1228 : /* Insert it on page */
2545 kgrittn 1229 GIC 362422 : newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
4131 tgl 1230 CBC 362422 : (Item) it, it->size,
1231 362422 : &startOffsets[leafPageSelect[i]],
4131 tgl 1232 ECB : false);
4131 tgl 1233 GIC 362422 : toInsert[i] = newoffset;
4131 tgl 1234 ECB :
1235 : /* ... and complete the chain linking */
4131 tgl 1236 GIC 362422 : ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
4131 tgl 1237 ECB :
1238 : /* Also copy leaf tuple into WAL data */
4131 tgl 1239 GIC 362422 : memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
4131 tgl 1240 CBC 362422 : leafptr += newLeafs[i]->size;
4131 tgl 1241 ECB : }
1242 :
1243 : /*
1244 : * We're done modifying the other leaf buffer (if any), so mark it dirty.
1245 : * current->buffer will be marked below, after we're entirely done
1246 : * modifying it.
1247 : */
4131 tgl 1248 GIC 2840 : if (newLeafBuffer != InvalidBuffer)
4131 tgl 1249 ECB : {
4131 tgl 1250 GIC 2806 : MarkBufferDirty(newLeafBuffer);
4131 tgl 1251 ECB : }
1252 :
1253 : /* Remember current buffer, since we're about to change "current" */
4131 tgl 1254 GIC 2840 : saveCurrent = *current;
4131 tgl 1255 ECB :
1256 : /*
1257 : * Store the new innerTuple
1258 : */
4131 tgl 1259 GIC 2840 : if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
4131 tgl 1260 ECB : {
1261 : /*
1262 : * new inner tuple goes to parent page
1263 : */
4131 tgl 1264 GIC 2415 : Assert(current->buffer != parent->buffer);
4131 tgl 1265 ECB :
1266 : /* Repoint "current" at the new inner tuple */
4131 tgl 1267 GIC 2415 : current->blkno = parent->blkno;
4131 tgl 1268 CBC 2415 : current->buffer = parent->buffer;
1269 2415 : current->page = parent->page;
1270 2415 : xlrec.offnumInner = current->offnum =
1271 2415 : SpGistPageAddNewItem(state, current->page,
1272 2415 : (Item) innerTuple, innerTuple->size,
4131 tgl 1273 ECB : NULL, false);
1274 :
1275 : /*
1276 : * Update parent node link and mark parent page dirty
1277 : */
3062 heikki.linnakangas 1278 GIC 2415 : xlrec.innerIsParent = true;
4131 tgl 1279 CBC 2415 : xlrec.offnumParent = parent->offnum;
1280 2415 : xlrec.nodeI = parent->node;
1281 2415 : saveNodeLink(index, parent, current->blkno, current->offnum);
4131 tgl 1282 ECB :
1283 : /*
1284 : * Update redirection link (in old current buffer)
1285 : */
4131 tgl 1286 GIC 2415 : if (redirectTuplePos != InvalidOffsetNumber)
4131 tgl 1287 CBC 577 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1288 577 : current->blkno, current->offnum);
4131 tgl 1289 ECB :
1290 : /* Done modifying old current buffer, mark it dirty */
4131 tgl 1291 GIC 2415 : MarkBufferDirty(saveCurrent.buffer);
4131 tgl 1292 ECB : }
4131 tgl 1293 GIC 425 : else if (parent->buffer != InvalidBuffer)
4131 tgl 1294 ECB : {
1295 : /*
1296 : * new inner tuple will be stored on a new page
1297 : */
4131 tgl 1298 GIC 384 : Assert(newInnerBuffer != InvalidBuffer);
4131 tgl 1299 ECB :
1300 : /* Repoint "current" at the new inner tuple */
4131 tgl 1301 GIC 384 : current->buffer = newInnerBuffer;
4131 tgl 1302 CBC 384 : current->blkno = BufferGetBlockNumber(current->buffer);
2545 kgrittn 1303 384 : current->page = BufferGetPage(current->buffer);
4131 tgl 1304 384 : xlrec.offnumInner = current->offnum =
1305 384 : SpGistPageAddNewItem(state, current->page,
1306 384 : (Item) innerTuple, innerTuple->size,
4131 tgl 1307 ECB : NULL, false);
1308 :
1309 : /* Done modifying new current buffer, mark it dirty */
4131 tgl 1310 GIC 384 : MarkBufferDirty(current->buffer);
4131 tgl 1311 ECB :
1312 : /*
1313 : * Update parent node link and mark parent page dirty
1314 : */
3062 heikki.linnakangas 1315 GIC 384 : xlrec.innerIsParent = (parent->buffer == current->buffer);
4131 tgl 1316 CBC 384 : xlrec.offnumParent = parent->offnum;
1317 384 : xlrec.nodeI = parent->node;
1318 384 : saveNodeLink(index, parent, current->blkno, current->offnum);
4131 tgl 1319 ECB :
1320 : /*
1321 : * Update redirection link (in old current buffer)
1322 : */
4131 tgl 1323 GIC 384 : if (redirectTuplePos != InvalidOffsetNumber)
4131 tgl 1324 CBC 62 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1325 62 : current->blkno, current->offnum);
4131 tgl 1326 ECB :
1327 : /* Done modifying old current buffer, mark it dirty */
4131 tgl 1328 GIC 384 : MarkBufferDirty(saveCurrent.buffer);
4131 tgl 1329 ECB : }
1330 : else
1331 : {
1332 : /*
1333 : * Splitting root page, which was a leaf but now becomes inner page
1334 : * (and so "current" continues to point at it)
1335 : */
4046 tgl 1336 GIC 41 : Assert(SpGistBlockIsRoot(current->blkno));
4131 tgl 1337 CBC 41 : Assert(redirectTuplePos == InvalidOffsetNumber);
4131 tgl 1338 ECB :
4046 tgl 1339 GIC 41 : SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
4131 tgl 1340 CBC 41 : xlrec.initInner = true;
3062 heikki.linnakangas 1341 41 : xlrec.innerIsParent = false;
4131 tgl 1342 ECB :
4131 tgl 1343 GIC 41 : xlrec.offnumInner = current->offnum =
4131 tgl 1344 CBC 41 : PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
4131 tgl 1345 ECB : InvalidOffsetNumber, false, false);
4131 tgl 1346 GIC 41 : if (current->offnum != FirstOffsetNumber)
4131 tgl 1347 LBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
4131 tgl 1348 EUB : innerTuple->size);
1349 :
1350 : /* No parent link to update, nor redirection to do */
4131 tgl 1351 GIC 41 : xlrec.offnumParent = InvalidOffsetNumber;
4131 tgl 1352 CBC 41 : xlrec.nodeI = 0;
4131 tgl 1353 ECB :
1354 : /* Done modifying new current buffer, mark it dirty */
4131 tgl 1355 GIC 41 : MarkBufferDirty(current->buffer);
4131 tgl 1356 ECB :
1357 : /* saveCurrent doesn't represent a different buffer */
4131 tgl 1358 GIC 41 : saveCurrent.buffer = InvalidBuffer;
4131 tgl 1359 ECB : }
1360 :
1467 heikki.linnakangas 1361 GIC 2840 : if (RelationNeedsWAL(index) && !state->isBuild)
4131 tgl 1362 ECB : {
1363 : XLogRecPtr recptr;
1364 : int flags;
1365 :
3062 heikki.linnakangas 1366 GIC 671 : XLogBeginInsert();
3062 heikki.linnakangas 1367 ECB :
3062 heikki.linnakangas 1368 GIC 671 : xlrec.nInsert = nToInsert;
3062 heikki.linnakangas 1369 CBC 671 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
3062 heikki.linnakangas 1370 ECB :
3062 heikki.linnakangas 1371 GIC 671 : XLogRegisterData((char *) toDelete,
3062 heikki.linnakangas 1372 CBC 671 : sizeof(OffsetNumber) * xlrec.nDelete);
1373 671 : XLogRegisterData((char *) toInsert,
1374 671 : sizeof(OffsetNumber) * xlrec.nInsert);
1375 671 : XLogRegisterData((char *) leafPageSelect,
1376 671 : sizeof(uint8) * xlrec.nInsert);
1377 671 : XLogRegisterData((char *) innerTuple, innerTuple->size);
1378 671 : XLogRegisterData(leafdata, leafptr - leafdata);
3062 heikki.linnakangas 1379 ECB :
1380 : /* Old leaf page */
3062 heikki.linnakangas 1381 GIC 671 : if (BufferIsValid(saveCurrent.buffer))
2820 heikki.linnakangas 1382 ECB : {
2820 heikki.linnakangas 1383 GIC 661 : flags = REGBUF_STANDARD;
2820 heikki.linnakangas 1384 CBC 661 : if (xlrec.initSrc)
1385 22 : flags |= REGBUF_WILL_INIT;
3062 1386 661 : XLogRegisterBuffer(0, saveCurrent.buffer, flags);
2820 heikki.linnakangas 1387 ECB : }
1388 :
1389 : /* New leaf page */
3062 heikki.linnakangas 1390 GIC 671 : if (BufferIsValid(newLeafBuffer))
3062 heikki.linnakangas 1391 ECB : {
3062 heikki.linnakangas 1392 GIC 649 : flags = REGBUF_STANDARD;
3062 heikki.linnakangas 1393 CBC 649 : if (xlrec.initDest)
1394 604 : flags |= REGBUF_WILL_INIT;
1395 649 : XLogRegisterBuffer(1, newLeafBuffer, flags);
3062 heikki.linnakangas 1396 ECB : }
1397 :
1398 : /* Inner page */
2820 heikki.linnakangas 1399 GIC 671 : flags = REGBUF_STANDARD;
2820 heikki.linnakangas 1400 CBC 671 : if (xlrec.initInner)
1401 20 : flags |= REGBUF_WILL_INIT;
1402 671 : XLogRegisterBuffer(2, current->buffer, flags);
2820 heikki.linnakangas 1403 ECB :
1404 : /* Parent page, if different from inner page */
3062 heikki.linnakangas 1405 GIC 671 : if (parent->buffer != InvalidBuffer)
3062 heikki.linnakangas 1406 ECB : {
3062 heikki.linnakangas 1407 GIC 661 : if (parent->buffer != current->buffer)
3062 heikki.linnakangas 1408 CBC 62 : XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 1409 ECB : else
3062 heikki.linnakangas 1410 GIC 599 : Assert(xlrec.innerIsParent);
3062 heikki.linnakangas 1411 ECB : }
1412 :
1413 : /* Issue the WAL record */
3062 heikki.linnakangas 1414 GIC 671 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
4131 tgl 1415 ECB :
1416 : /* Update page LSNs on all affected pages */
4131 tgl 1417 GIC 671 : if (newLeafBuffer != InvalidBuffer)
4131 tgl 1418 ECB : {
2545 kgrittn 1419 GIC 649 : Page page = BufferGetPage(newLeafBuffer);
2545 kgrittn 1420 ECB :
4131 tgl 1421 GIC 649 : PageSetLSN(page, recptr);
4131 tgl 1422 ECB : }
1423 :
4131 tgl 1424 GIC 671 : if (saveCurrent.buffer != InvalidBuffer)
4131 tgl 1425 ECB : {
2545 kgrittn 1426 GIC 661 : Page page = BufferGetPage(saveCurrent.buffer);
2545 kgrittn 1427 ECB :
4131 tgl 1428 GIC 661 : PageSetLSN(page, recptr);
4131 tgl 1429 ECB : }
1430 :
4131 tgl 1431 GIC 671 : PageSetLSN(current->page, recptr);
4131 tgl 1432 ECB :
4131 tgl 1433 GIC 671 : if (parent->buffer != InvalidBuffer)
2545 kgrittn 1434 ECB : {
4131 tgl 1435 GIC 661 : PageSetLSN(parent->page, recptr);
2545 kgrittn 1436 ECB : }
1437 : }
1438 :
4131 tgl 1439 GIC 2840 : END_CRIT_SECTION();
4131 tgl 1440 ECB :
1441 : /* Update local free-space cache and unlock buffers */
4131 tgl 1442 GIC 2840 : if (newLeafBuffer != InvalidBuffer)
4131 tgl 1443 ECB : {
4131 tgl 1444 GIC 2806 : SpGistSetLastUsedPage(index, newLeafBuffer);
4131 tgl 1445 CBC 2806 : UnlockReleaseBuffer(newLeafBuffer);
4131 tgl 1446 ECB : }
4131 tgl 1447 GIC 2840 : if (saveCurrent.buffer != InvalidBuffer)
4131 tgl 1448 ECB : {
4131 tgl 1449 GIC 2799 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
4131 tgl 1450 CBC 2799 : UnlockReleaseBuffer(saveCurrent.buffer);
4131 tgl 1451 ECB : }
1452 :
4131 tgl 1453 GIC 2840 : return insertedNew;
4131 tgl 1454 ECB : }
1455 :
1456 : /*
1457 : * spgMatchNode action: descend to N'th child node of current inner tuple
1458 : */
1459 : static void
4131 tgl 1460 GIC 9331580 : spgMatchNodeAction(Relation index, SpGistState *state,
4131 tgl 1461 ECB : SpGistInnerTuple innerTuple,
1462 : SPPageDesc *current, SPPageDesc *parent, int nodeN)
1463 : {
1464 : int i;
1465 : SpGistNodeTuple node;
1466 :
1467 : /* Release previous parent buffer if any */
4131 tgl 1468 GIC 9331580 : if (parent->buffer != InvalidBuffer &&
4131 tgl 1469 CBC 8936882 : parent->buffer != current->buffer)
4131 tgl 1470 ECB : {
4131 tgl 1471 GIC 392309 : SpGistSetLastUsedPage(index, parent->buffer);
4131 tgl 1472 CBC 392309 : UnlockReleaseBuffer(parent->buffer);
4131 tgl 1473 ECB : }
1474 :
1475 : /* Repoint parent to specified node of current inner tuple */
4131 tgl 1476 GIC 9331580 : parent->blkno = current->blkno;
4131 tgl 1477 CBC 9331580 : parent->buffer = current->buffer;
1478 9331580 : parent->page = current->page;
1479 9331580 : parent->offnum = current->offnum;
1480 9331580 : parent->node = nodeN;
4131 tgl 1481 ECB :
1482 : /* Locate that node */
4131 tgl 1483 GIC 15631807 : SGITITERATE(innerTuple, i, node)
4131 tgl 1484 ECB : {
4131 tgl 1485 GIC 15631807 : if (i == nodeN)
4131 tgl 1486 CBC 9331580 : break;
4131 tgl 1487 ECB : }
1488 :
4131 tgl 1489 GIC 9331580 : if (i != nodeN)
4131 tgl 1490 LBC 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
4131 tgl 1491 EUB : nodeN);
1492 :
1493 : /* Point current to the downlink location, if any */
4131 tgl 1494 GIC 9331580 : if (ItemPointerIsValid(&node->t_tid))
4131 tgl 1495 ECB : {
4131 tgl 1496 GIC 9330543 : current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
4131 tgl 1497 CBC 9330543 : current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
4131 tgl 1498 ECB : }
1499 : else
1500 : {
1501 : /* Downlink is empty, so we'll need to find a new page */
4131 tgl 1502 GIC 1037 : current->blkno = InvalidBlockNumber;
4131 tgl 1503 CBC 1037 : current->offnum = InvalidOffsetNumber;
4131 tgl 1504 ECB : }
1505 :
4131 tgl 1506 GIC 9331580 : current->buffer = InvalidBuffer;
4131 tgl 1507 CBC 9331580 : current->page = NULL;
1508 9331580 : }
4131 tgl 1509 ECB :
1510 : /*
1511 : * spgAddNode action: add a node to the inner tuple at current
1512 : */
1513 : static void
4131 tgl 1514 GIC 783 : spgAddNodeAction(Relation index, SpGistState *state,
4131 tgl 1515 ECB : SpGistInnerTuple innerTuple,
1516 : SPPageDesc *current, SPPageDesc *parent,
1517 : int nodeN, Datum nodeLabel)
1518 : {
1519 : SpGistInnerTuple newInnerTuple;
1520 : spgxlogAddNode xlrec;
1521 :
1522 : /* Should not be applied to nulls */
4046 tgl 1523 GIC 783 : Assert(!SpGistPageStoresNulls(current->page));
4046 tgl 1524 ECB :
1525 : /* Construct new inner tuple with additional node */
4131 tgl 1526 GIC 783 : newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
4131 tgl 1527 ECB :
1528 : /* Prepare WAL record */
4131 tgl 1529 GIC 783 : STORE_STATE(state, xlrec.stateSrc);
4131 tgl 1530 CBC 783 : xlrec.offnum = current->offnum;
4131 tgl 1531 ECB :
1532 : /* we don't fill these unless we need to change the parent downlink */
3062 heikki.linnakangas 1533 GIC 783 : xlrec.parentBlk = -1;
4131 tgl 1534 CBC 783 : xlrec.offnumParent = InvalidOffsetNumber;
1535 783 : xlrec.nodeI = 0;
4131 tgl 1536 ECB :
1537 : /* we don't fill these unless tuple has to be moved */
4131 tgl 1538 GIC 783 : xlrec.offnumNew = InvalidOffsetNumber;
4131 tgl 1539 CBC 783 : xlrec.newPage = false;
4131 tgl 1540 ECB :
4131 tgl 1541 GIC 783 : if (PageGetExactFreeSpace(current->page) >=
4131 tgl 1542 CBC 783 : newInnerTuple->size - innerTuple->size)
4131 tgl 1543 ECB : {
1544 : /*
1545 : * We can replace the inner tuple by new version in-place
1546 : */
4131 tgl 1547 GIC 780 : START_CRIT_SECTION();
4131 tgl 1548 ECB :
4131 tgl 1549 GIC 780 : PageIndexTupleDelete(current->page, current->offnum);
4131 tgl 1550 CBC 780 : if (PageAddItem(current->page,
4131 tgl 1551 ECB : (Item) newInnerTuple, newInnerTuple->size,
4131 tgl 1552 GIC 780 : current->offnum, false, false) != current->offnum)
4131 tgl 1553 LBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
4131 tgl 1554 EUB : newInnerTuple->size);
1555 :
4131 tgl 1556 GIC 780 : MarkBufferDirty(current->buffer);
4131 tgl 1557 ECB :
1467 heikki.linnakangas 1558 GIC 780 : if (RelationNeedsWAL(index) && !state->isBuild)
4131 tgl 1559 ECB : {
1560 : XLogRecPtr recptr;
1561 :
3062 heikki.linnakangas 1562 GIC 355 : XLogBeginInsert();
3062 heikki.linnakangas 1563 CBC 355 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1564 355 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
3062 heikki.linnakangas 1565 ECB :
3062 heikki.linnakangas 1566 GIC 355 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 1567 ECB :
3062 heikki.linnakangas 1568 GIC 355 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
4131 tgl 1569 ECB :
4131 tgl 1570 GIC 355 : PageSetLSN(current->page, recptr);
4131 tgl 1571 ECB : }
1572 :
4131 tgl 1573 GIC 780 : END_CRIT_SECTION();
4131 tgl 1574 ECB : }
1575 : else
1576 : {
1577 : /*
1578 : * move inner tuple to another page, and update parent
1579 : */
1580 : SpGistDeadTuple dt;
1581 : SPPageDesc saveCurrent;
1582 :
1583 : /*
1584 : * It should not be possible to get here for the root page, since we
1585 : * allow only one inner tuple on the root page, and spgFormInnerTuple
1586 : * always checks that inner tuples don't exceed the size of a page.
1587 : */
4046 tgl 1588 GIC 3 : if (SpGistBlockIsRoot(current->blkno))
4131 tgl 1589 LBC 0 : elog(ERROR, "cannot enlarge root tuple any more");
4131 tgl 1590 GBC 3 : Assert(parent->buffer != InvalidBuffer);
4131 tgl 1591 ECB :
4131 tgl 1592 GIC 3 : saveCurrent = *current;
4131 tgl 1593 ECB :
4131 tgl 1594 GIC 3 : xlrec.offnumParent = parent->offnum;
4131 tgl 1595 CBC 3 : xlrec.nodeI = parent->node;
4131 tgl 1596 ECB :
1597 : /*
1598 : * obtain new buffer with the same parity as current, since it will be
1599 : * a child of same parent tuple
1600 : */
4131 tgl 1601 GIC 6 : current->buffer = SpGistGetBuffer(index,
4131 tgl 1602 CBC 3 : GBUF_INNER_PARITY(current->blkno),
2118 1603 3 : newInnerTuple->size + sizeof(ItemIdData),
4131 tgl 1604 ECB : &xlrec.newPage);
4131 tgl 1605 GIC 3 : current->blkno = BufferGetBlockNumber(current->buffer);
2545 kgrittn 1606 CBC 3 : current->page = BufferGetPage(current->buffer);
4131 tgl 1607 ECB :
1608 : /*
1609 : * Let's just make real sure new current isn't same as old. Right now
1610 : * that's impossible, but if SpGistGetBuffer ever got smart enough to
1611 : * delete placeholder tuples before checking space, maybe it wouldn't
1612 : * be impossible. The case would appear to work except that WAL
1613 : * replay would be subtly wrong, so I think a mere assert isn't enough
1614 : * here.
1615 : */
3062 heikki.linnakangas 1616 GIC 3 : if (current->blkno == saveCurrent.blkno)
3955 bruce 1617 LBC 0 : elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
4131 tgl 1618 EUB :
1619 : /*
1620 : * New current and parent buffer will both be modified; but note that
1621 : * parent buffer could be same as either new or old current.
1622 : */
3062 heikki.linnakangas 1623 GIC 3 : if (parent->buffer == saveCurrent.buffer)
3062 heikki.linnakangas 1624 LBC 0 : xlrec.parentBlk = 0;
3062 heikki.linnakangas 1625 GBC 3 : else if (parent->buffer == current->buffer)
3062 heikki.linnakangas 1626 LBC 0 : xlrec.parentBlk = 1;
3062 heikki.linnakangas 1627 EUB : else
3062 heikki.linnakangas 1628 GIC 3 : xlrec.parentBlk = 2;
4131 tgl 1629 ECB :
4131 tgl 1630 GIC 3 : START_CRIT_SECTION();
4131 tgl 1631 ECB :
1632 : /* insert new ... */
4131 tgl 1633 GIC 3 : xlrec.offnumNew = current->offnum =
4131 tgl 1634 CBC 3 : SpGistPageAddNewItem(state, current->page,
1635 3 : (Item) newInnerTuple, newInnerTuple->size,
4131 tgl 1636 ECB : NULL, false);
1637 :
4131 tgl 1638 GIC 3 : MarkBufferDirty(current->buffer);
4131 tgl 1639 ECB :
1640 : /* update parent's downlink and mark parent page dirty */
4131 tgl 1641 GIC 3 : saveNodeLink(index, parent, current->blkno, current->offnum);
4131 tgl 1642 ECB :
1643 : /*
1644 : * Replace old tuple with a placeholder or redirection tuple. Unless
1645 : * doing an index build, we have to insert a redirection tuple for
1646 : * possible concurrent scans. We can't just delete it in any case,
1647 : * because that could change the offsets of other tuples on the page,
1648 : * breaking downlinks from their parents.
1649 : */
4131 tgl 1650 GIC 3 : if (state->isBuild)
4131 tgl 1651 LBC 0 : dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
4131 tgl 1652 EUB : InvalidBlockNumber, InvalidOffsetNumber);
1653 : else
4131 tgl 1654 GIC 3 : dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
4131 tgl 1655 CBC 3 : current->blkno, current->offnum);
4131 tgl 1656 ECB :
4131 tgl 1657 GIC 3 : PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
4131 tgl 1658 CBC 3 : if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
4131 tgl 1659 ECB : saveCurrent.offnum,
4131 tgl 1660 GIC 3 : false, false) != saveCurrent.offnum)
4131 tgl 1661 LBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
4131 tgl 1662 EUB : dt->size);
1663 :
4131 tgl 1664 GIC 3 : if (state->isBuild)
4131 tgl 1665 LBC 0 : SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
4131 tgl 1666 EUB : else
4131 tgl 1667 GIC 3 : SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
4131 tgl 1668 ECB :
4131 tgl 1669 GIC 3 : MarkBufferDirty(saveCurrent.buffer);
4131 tgl 1670 ECB :
1467 heikki.linnakangas 1671 GIC 3 : if (RelationNeedsWAL(index) && !state->isBuild)
4131 tgl 1672 ECB : {
1673 : XLogRecPtr recptr;
1674 : int flags;
1675 :
3062 heikki.linnakangas 1676 GIC 3 : XLogBeginInsert();
3062 heikki.linnakangas 1677 ECB :
1678 : /* orig page */
3062 heikki.linnakangas 1679 GIC 3 : XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 1680 ECB : /* new page */
2820 heikki.linnakangas 1681 GIC 3 : flags = REGBUF_STANDARD;
2820 heikki.linnakangas 1682 CBC 3 : if (xlrec.newPage)
1683 3 : flags |= REGBUF_WILL_INIT;
1684 3 : XLogRegisterBuffer(1, current->buffer, flags);
3062 heikki.linnakangas 1685 ECB : /* parent page (if different from orig and new) */
3062 heikki.linnakangas 1686 GIC 3 : if (xlrec.parentBlk == 2)
3062 heikki.linnakangas 1687 CBC 3 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 1688 ECB :
3062 heikki.linnakangas 1689 GIC 3 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
3062 heikki.linnakangas 1690 CBC 3 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
3062 heikki.linnakangas 1691 ECB :
3062 heikki.linnakangas 1692 GIC 3 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
4131 tgl 1693 ECB :
1694 : /* we don't bother to check if any of these are redundant */
4131 tgl 1695 GIC 3 : PageSetLSN(current->page, recptr);
4131 tgl 1696 CBC 3 : PageSetLSN(parent->page, recptr);
1697 3 : PageSetLSN(saveCurrent.page, recptr);
4131 tgl 1698 ECB : }
1699 :
4131 tgl 1700 GIC 3 : END_CRIT_SECTION();
4131 tgl 1701 ECB :
1702 : /* Release saveCurrent if it's not same as current or parent */
4131 tgl 1703 GIC 3 : if (saveCurrent.buffer != current->buffer &&
4131 tgl 1704 CBC 3 : saveCurrent.buffer != parent->buffer)
4131 tgl 1705 ECB : {
4131 tgl 1706 GIC 3 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
4131 tgl 1707 CBC 3 : UnlockReleaseBuffer(saveCurrent.buffer);
4131 tgl 1708 ECB : }
1709 : }
4131 tgl 1710 GIC 783 : }
4131 tgl 1711 ECB :
1712 : /*
1713 : * spgSplitNode action: split inner tuple at current into prefix and postfix
1714 : */
1715 : static void
4131 tgl 1716 GIC 321 : spgSplitNodeAction(Relation index, SpGistState *state,
4131 tgl 1717 ECB : SpGistInnerTuple innerTuple,
1718 : SPPageDesc *current, spgChooseOut *out)
1719 : {
1720 : SpGistInnerTuple prefixTuple,
1721 : postfixTuple;
1722 : SpGistNodeTuple node,
1723 : *nodes;
1724 : BlockNumber postfixBlkno;
1725 : OffsetNumber postfixOffset;
1726 : int i;
1727 : spgxlogSplitTuple xlrec;
4131 tgl 1728 GIC 321 : Buffer newBuffer = InvalidBuffer;
4131 tgl 1729 ECB :
1730 : /* Should not be applied to nulls */
4046 tgl 1731 GIC 321 : Assert(!SpGistPageStoresNulls(current->page));
4046 tgl 1732 ECB :
1733 : /* Check opclass gave us sane values */
2420 tgl 1734 GIC 321 : if (out->result.splitTuple.prefixNNodes <= 0 ||
2420 tgl 1735 CBC 321 : out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
2420 tgl 1736 LBC 0 : elog(ERROR, "invalid number of prefix nodes: %d",
2420 tgl 1737 EUB : out->result.splitTuple.prefixNNodes);
2420 tgl 1738 GIC 321 : if (out->result.splitTuple.childNodeN < 0 ||
2420 tgl 1739 CBC 321 : out->result.splitTuple.childNodeN >=
1740 321 : out->result.splitTuple.prefixNNodes)
2420 tgl 1741 LBC 0 : elog(ERROR, "invalid child node number: %d",
2420 tgl 1742 EUB : out->result.splitTuple.childNodeN);
1743 :
1744 : /*
1745 : * Construct new prefix tuple with requested number of nodes. We'll fill
1746 : * in the childNodeN'th node's downlink below.
1747 : */
2420 tgl 1748 GIC 321 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
2420 tgl 1749 CBC 321 : out->result.splitTuple.prefixNNodes);
2420 tgl 1750 ECB :
2420 tgl 1751 GIC 642 : for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
2420 tgl 1752 ECB : {
2420 tgl 1753 GIC 321 : Datum label = (Datum) 0;
2420 tgl 1754 ECB : bool labelisnull;
1755 :
2420 tgl 1756 GIC 321 : labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
2420 tgl 1757 CBC 321 : if (!labelisnull)
1758 321 : label = out->result.splitTuple.prefixNodeLabels[i];
1759 321 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
2420 tgl 1760 ECB : }
1761 :
4131 tgl 1762 GIC 321 : prefixTuple = spgFormInnerTuple(state,
4131 tgl 1763 CBC 321 : out->result.splitTuple.prefixHasPrefix,
4131 tgl 1764 ECB : out->result.splitTuple.prefixPrefixDatum,
1765 : out->result.splitTuple.prefixNNodes,
1766 : nodes);
1767 :
1768 : /* it must fit in the space that innerTuple now occupies */
4131 tgl 1769 GIC 321 : if (prefixTuple->size > innerTuple->size)
4131 tgl 1770 LBC 0 : elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
4131 tgl 1771 EUB :
1772 : /*
1773 : * Construct new postfix tuple, containing all nodes of innerTuple with
1774 : * same node datums, but with the prefix specified by the picksplit
1775 : * function.
1776 : */
4131 tgl 1777 GIC 321 : nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
4131 tgl 1778 CBC 1121 : SGITITERATE(innerTuple, i, node)
4131 tgl 1779 ECB : {
4131 tgl 1780 GIC 800 : nodes[i] = node;
4131 tgl 1781 ECB : }
1782 :
4131 tgl 1783 GIC 321 : postfixTuple = spgFormInnerTuple(state,
4131 tgl 1784 CBC 321 : out->result.splitTuple.postfixHasPrefix,
2118 tgl 1785 ECB : out->result.splitTuple.postfixPrefixDatum,
4131 tgl 1786 GIC 321 : innerTuple->nNodes, nodes);
4131 tgl 1787 ECB :
1788 : /* Postfix tuple is allTheSame if original tuple was */
4131 tgl 1789 GIC 321 : postfixTuple->allTheSame = innerTuple->allTheSame;
4131 tgl 1790 ECB :
1791 : /* prep data for WAL record */
4131 tgl 1792 GIC 321 : xlrec.newPage = false;
4131 tgl 1793 ECB :
1794 : /*
1795 : * If we can't fit both tuples on the current page, get a new page for the
1796 : * postfix tuple. In particular, can't split to the root page.
1797 : *
1798 : * For the space calculation, note that prefixTuple replaces innerTuple
1799 : * but postfixTuple will be a new entry.
1800 : */
4046 tgl 1801 GIC 321 : if (SpGistBlockIsRoot(current->blkno) ||
4131 tgl 1802 CBC 315 : SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1803 315 : prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
4131 tgl 1804 ECB : {
1805 : /*
1806 : * Choose page with next triple parity, because postfix tuple is a
1807 : * child of prefix one
1808 : */
4131 tgl 1809 GIC 84 : newBuffer = SpGistGetBuffer(index,
4131 tgl 1810 CBC 84 : GBUF_INNER_PARITY(current->blkno + 1),
1811 84 : postfixTuple->size + sizeof(ItemIdData),
4131 tgl 1812 ECB : &xlrec.newPage);
1813 : }
1814 :
4131 tgl 1815 GIC 321 : START_CRIT_SECTION();
4131 tgl 1816 ECB :
1817 : /*
1818 : * Replace old tuple by prefix tuple
1819 : */
4131 tgl 1820 GIC 321 : PageIndexTupleDelete(current->page, current->offnum);
4131 tgl 1821 CBC 321 : xlrec.offnumPrefix = PageAddItem(current->page,
4131 tgl 1822 ECB : (Item) prefixTuple, prefixTuple->size,
1823 : current->offnum, false, false);
4131 tgl 1824 GIC 321 : if (xlrec.offnumPrefix != current->offnum)
4131 tgl 1825 LBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
4131 tgl 1826 EUB : prefixTuple->size);
1827 :
1828 : /*
1829 : * put postfix tuple into appropriate page
1830 : */
4131 tgl 1831 GIC 321 : if (newBuffer == InvalidBuffer)
4131 tgl 1832 ECB : {
3062 heikki.linnakangas 1833 GIC 237 : postfixBlkno = current->blkno;
4131 tgl 1834 CBC 237 : xlrec.offnumPostfix = postfixOffset =
1835 237 : SpGistPageAddNewItem(state, current->page,
1836 237 : (Item) postfixTuple, postfixTuple->size,
4131 tgl 1837 ECB : NULL, false);
3062 heikki.linnakangas 1838 GIC 237 : xlrec.postfixBlkSame = true;
4131 tgl 1839 ECB : }
1840 : else
1841 : {
3062 heikki.linnakangas 1842 GIC 84 : postfixBlkno = BufferGetBlockNumber(newBuffer);
4131 tgl 1843 CBC 84 : xlrec.offnumPostfix = postfixOffset =
2545 kgrittn 1844 84 : SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
4131 tgl 1845 84 : (Item) postfixTuple, postfixTuple->size,
4131 tgl 1846 ECB : NULL, false);
4131 tgl 1847 GIC 84 : MarkBufferDirty(newBuffer);
3062 heikki.linnakangas 1848 CBC 84 : xlrec.postfixBlkSame = false;
4131 tgl 1849 ECB : }
1850 :
1851 : /*
1852 : * And set downlink pointer in the prefix tuple to point to postfix tuple.
1853 : * (We can't avoid this step by doing the above two steps in opposite
1854 : * order, because there might not be enough space on the page to insert
1855 : * the postfix tuple first.) We have to update the local copy of the
1856 : * prefixTuple too, because that's what will be written to WAL.
1857 : */
2420 tgl 1858 GIC 321 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
2420 tgl 1859 ECB : postfixBlkno, postfixOffset);
4131 tgl 1860 GIC 321 : prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
2118 tgl 1861 CBC 321 : PageGetItemId(current->page, current->offnum));
2420 1862 321 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
2420 tgl 1863 ECB : postfixBlkno, postfixOffset);
1864 :
4131 tgl 1865 GIC 321 : MarkBufferDirty(current->buffer);
4131 tgl 1866 ECB :
1467 heikki.linnakangas 1867 GIC 321 : if (RelationNeedsWAL(index) && !state->isBuild)
4131 tgl 1868 ECB : {
1869 : XLogRecPtr recptr;
1870 :
3062 heikki.linnakangas 1871 GIC 306 : XLogBeginInsert();
3062 heikki.linnakangas 1872 CBC 306 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1873 306 : XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1874 306 : XLogRegisterData((char *) postfixTuple, postfixTuple->size);
3062 heikki.linnakangas 1875 ECB :
3062 heikki.linnakangas 1876 GIC 306 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
3062 heikki.linnakangas 1877 CBC 306 : if (newBuffer != InvalidBuffer)
3062 heikki.linnakangas 1878 ECB : {
1879 : int flags;
1880 :
3062 heikki.linnakangas 1881 GIC 81 : flags = REGBUF_STANDARD;
3062 heikki.linnakangas 1882 CBC 81 : if (xlrec.newPage)
1883 3 : flags |= REGBUF_WILL_INIT;
1884 81 : XLogRegisterBuffer(1, newBuffer, flags);
3062 heikki.linnakangas 1885 ECB : }
1886 :
3062 heikki.linnakangas 1887 GIC 306 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
4131 tgl 1888 ECB :
4131 tgl 1889 GIC 306 : PageSetLSN(current->page, recptr);
4131 tgl 1890 ECB :
4131 tgl 1891 GIC 306 : if (newBuffer != InvalidBuffer)
4131 tgl 1892 ECB : {
2545 kgrittn 1893 GIC 81 : PageSetLSN(BufferGetPage(newBuffer), recptr);
4131 tgl 1894 ECB : }
1895 : }
1896 :
4131 tgl 1897 GIC 321 : END_CRIT_SECTION();
4131 tgl 1898 ECB :
1899 : /* Update local free-space cache and release buffer */
4131 tgl 1900 GIC 321 : if (newBuffer != InvalidBuffer)
4131 tgl 1901 ECB : {
4131 tgl 1902 GIC 84 : SpGistSetLastUsedPage(index, newBuffer);
4131 tgl 1903 CBC 84 : UnlockReleaseBuffer(newBuffer);
4131 tgl 1904 ECB : }
4131 tgl 1905 GIC 321 : }
4131 tgl 1906 ECB :
1907 : /*
1908 : * Insert one item into the index.
1909 : *
1910 : * Returns true on success, false if we failed to complete the insertion
1911 : * (typically because of conflict with a concurrent insert). In the latter
1912 : * case, caller should re-call spgdoinsert() with the same args.
1913 : */
1914 : bool
4131 tgl 1915 GIC 403184 : spgdoinsert(Relation index, SpGistState *state,
734 tgl 1916 ECB : ItemPointer heapPtr, Datum *datums, bool *isnulls)
1917 : {
695 tgl 1918 GIC 403184 : bool result = true;
734 tgl 1919 CBC 403184 : TupleDesc leafDescriptor = state->leafTupDesc;
1920 403184 : bool isnull = isnulls[spgKeyColumn];
4131 1921 403184 : int level = 0;
734 tgl 1922 ECB : Datum leafDatums[INDEX_MAX_KEYS];
1923 : int leafSize;
1924 : int bestLeafSize;
695 tgl 1925 GIC 403184 : int numNoProgressCycles = 0;
4131 tgl 1926 ECB : SPPageDesc current,
1927 : parent;
3875 heikki.linnakangas 1928 GIC 403184 : FmgrInfo *procinfo = NULL;
3875 heikki.linnakangas 1929 ECB :
1930 : /*
1931 : * Look up FmgrInfo of the user-defined choose function once, to save
1932 : * cycles in the loop below.
1933 : */
3875 heikki.linnakangas 1934 GIC 403184 : if (!isnull)
3875 heikki.linnakangas 1935 CBC 403139 : procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
4131 tgl 1936 ECB :
1937 : /*
1938 : * Prepare the leaf datum to insert.
1939 : *
1940 : * If an optional "compress" method is provided, then call it to form the
1941 : * leaf key datum from the input datum. Otherwise, store the input datum
1942 : * as is. Since we don't use index_form_tuple in this AM, we have to make
1943 : * sure value to be inserted is not toasted; FormIndexDatum doesn't
1944 : * guarantee that. But we assume the "compress" method to return an
1945 : * untoasted value.
1946 : */
1934 teodor 1947 GIC 403184 : if (!isnull)
1934 teodor 1948 ECB : {
1934 teodor 1949 GIC 403139 : if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1934 teodor 1950 ECB : {
1934 teodor 1951 GIC 39622 : FmgrInfo *compressProcinfo = NULL;
1934 teodor 1952 ECB :
1934 teodor 1953 GIC 39622 : compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
734 tgl 1954 CBC 39622 : leafDatums[spgKeyColumn] =
1955 39622 : FunctionCall1Coll(compressProcinfo,
1956 39622 : index->rd_indcollation[spgKeyColumn],
734 tgl 1957 ECB : datums[spgKeyColumn]);
1958 : }
1959 : else
1960 : {
1934 teodor 1961 GIC 363517 : Assert(state->attLeafType.type == state->attType.type);
4131 tgl 1962 ECB :
1934 teodor 1963 GIC 363517 : if (state->attType.attlen == -1)
734 tgl 1964 CBC 90332 : leafDatums[spgKeyColumn] =
1965 90332 : PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1934 teodor 1966 ECB : else
734 tgl 1967 GIC 273185 : leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1934 teodor 1968 ECB : }
1969 : }
1970 : else
734 tgl 1971 GIC 45 : leafDatums[spgKeyColumn] = (Datum) 0;
734 tgl 1972 ECB :
1973 : /* Likewise, ensure that any INCLUDE values are not toasted */
734 tgl 1974 GIC 449437 : for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
734 tgl 1975 ECB : {
734 tgl 1976 GIC 46253 : if (!isnulls[i])
734 tgl 1977 ECB : {
734 tgl 1978 GIC 42806 : if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
734 tgl 1979 CBC 6622 : leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
734 tgl 1980 ECB : else
734 tgl 1981 GIC 36184 : leafDatums[i] = datums[i];
734 tgl 1982 ECB : }
1983 : else
734 tgl 1984 GIC 3447 : leafDatums[i] = (Datum) 0;
734 tgl 1985 ECB : }
1986 :
1987 : /*
1988 : * Compute space needed for a leaf tuple containing the given data.
1989 : */
734 tgl 1990 GIC 403184 : leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
738 tgl 1991 ECB : /* Account for an item pointer, too */
738 tgl 1992 GIC 403184 : leafSize += sizeof(ItemIdData);
4131 tgl 1993 ECB :
1994 : /*
1995 : * If it isn't gonna fit, and the opclass can't reduce the datum size by
1996 : * suffixing, bail out now rather than doing a lot of useless work.
1997 : */
695 tgl 1998 GIC 403184 : if (leafSize > SPGIST_PAGE_CAPACITY &&
695 tgl 1999 CBC 2 : (isnull || !state->config.longValuesOK))
4131 tgl 2000 LBC 0 : ereport(ERROR,
4131 tgl 2001 EUB : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2002 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2003 : leafSize - sizeof(ItemIdData),
2004 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2005 : RelationGetRelationName(index)),
2006 : errhint("Values larger than a buffer page cannot be indexed.")));
695 tgl 2007 GIC 403184 : bestLeafSize = leafSize;
4131 tgl 2008 ECB :
2009 : /* Initialize "current" to the appropriate root page */
4046 tgl 2010 GIC 403184 : current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
4131 tgl 2011 CBC 403184 : current.buffer = InvalidBuffer;
2012 403184 : current.page = NULL;
2013 403184 : current.offnum = FirstOffsetNumber;
2014 403184 : current.node = -1;
4131 tgl 2015 ECB :
2016 : /* "parent" is invalid for the moment */
4131 tgl 2017 GIC 403184 : parent.blkno = InvalidBlockNumber;
4131 tgl 2018 CBC 403184 : parent.buffer = InvalidBuffer;
2019 403184 : parent.page = NULL;
2020 403184 : parent.offnum = InvalidOffsetNumber;
2021 403184 : parent.node = -1;
4131 tgl 2022 ECB :
2023 : /*
2024 : * Before entering the loop, try to clear any pending interrupt condition.
2025 : * If a query cancel is pending, we might as well accept it now not later;
2026 : * while if a non-canceling condition is pending, servicing it here avoids
2027 : * having to restart the insertion and redo all the work so far.
2028 : */
695 tgl 2029 GIC 403184 : CHECK_FOR_INTERRUPTS();
695 tgl 2030 ECB :
2031 : for (;;)
4131 tgl 2032 GIC 9331578 : {
4131 tgl 2033 CBC 9734762 : bool isNew = false;
4131 tgl 2034 ECB :
2035 : /*
2036 : * Bail out if query cancel is pending. We must have this somewhere
2037 : * in the loop since a broken opclass could produce an infinite
2038 : * picksplit loop. However, because we'll be holding buffer lock(s)
2039 : * after the first iteration, ProcessInterrupts() wouldn't be able to
2040 : * throw a cancel error here. Hence, if we see that an interrupt is
2041 : * pending, break out of the loop and deal with the situation below.
2042 : * Set result = false because we must restart the insertion if the
2043 : * interrupt isn't a query-cancel-or-die case.
2044 : */
695 tgl 2045 GIC 9734762 : if (INTERRUPTS_PENDING_CONDITION())
695 tgl 2046 ECB : {
695 tgl 2047 UIC 0 : result = false;
695 tgl 2048 UBC 0 : break;
695 tgl 2049 EUB : }
2050 :
4131 tgl 2051 GIC 9734762 : if (current.blkno == InvalidBlockNumber)
4131 tgl 2052 ECB : {
2053 : /*
2054 : * Create a leaf page. If leafSize is too large to fit on a page,
2055 : * we won't actually use the page yet, but it simplifies the API
2056 : * for doPickSplit to always have a leaf page at hand; so just
2057 : * quietly limit our request to a page size.
2058 : */
4046 tgl 2059 GIC 1035 : current.buffer =
4046 tgl 2060 CBC 1035 : SpGistGetBuffer(index,
4046 tgl 2061 ECB : GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
4046 tgl 2062 GIC 1035 : Min(leafSize, SPGIST_PAGE_CAPACITY),
4046 tgl 2063 ECB : &isNew);
4131 tgl 2064 GIC 1035 : current.blkno = BufferGetBlockNumber(current.buffer);
4131 tgl 2065 ECB : }
3586 tgl 2066 GIC 9733727 : else if (parent.buffer == InvalidBuffer)
4131 tgl 2067 ECB : {
2068 : /* we hold no parent-page lock, so no deadlock is possible */
4131 tgl 2069 GIC 403184 : current.buffer = ReadBuffer(index, current.blkno);
4131 tgl 2070 CBC 403184 : LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
4131 tgl 2071 ECB : }
3586 tgl 2072 GIC 9330543 : else if (current.blkno != parent.blkno)
3586 tgl 2073 ECB : {
2074 : /* descend to a new child page */
3586 tgl 2075 GIC 786068 : current.buffer = ReadBuffer(index, current.blkno);
3586 tgl 2076 ECB :
2077 : /*
2078 : * Attempt to acquire lock on child page. We must beware of
2079 : * deadlock against another insertion process descending from that
2080 : * page to our parent page (see README). If we fail to get lock,
2081 : * abandon the insertion and tell our caller to start over.
2082 : *
2083 : * XXX this could be improved, because failing to get lock on a
2084 : * buffer is not proof of a deadlock situation; the lock might be
2085 : * held by a reader, or even just background writer/checkpointer
2086 : * process. Perhaps it'd be worth retrying after sleeping a bit?
2087 : */
3586 tgl 2088 GIC 786068 : if (!ConditionalLockBuffer(current.buffer))
3586 tgl 2089 ECB : {
3586 tgl 2090 GIC 598 : ReleaseBuffer(current.buffer);
3586 tgl 2091 CBC 598 : UnlockReleaseBuffer(parent.buffer);
2092 598 : return false;
3586 tgl 2093 ECB : }
2094 : }
2095 : else
2096 : {
2097 : /* inner tuple can be stored on the same page as parent one */
4131 tgl 2098 GIC 8544475 : current.buffer = parent.buffer;
4131 tgl 2099 ECB : }
2545 kgrittn 2100 GIC 9734164 : current.page = BufferGetPage(current.buffer);
4131 tgl 2101 ECB :
2102 : /* should not arrive at a page of the wrong type */
4046 tgl 2103 GIC 19468283 : if (isnull ? !SpGistPageStoresNulls(current.page) :
4046 tgl 2104 CBC 9734119 : SpGistPageStoresNulls(current.page))
4046 tgl 2105 LBC 0 : elog(ERROR, "SPGiST index page %u has wrong nulls flag",
4046 tgl 2106 EUB : current.blkno);
2107 :
4131 tgl 2108 GIC 9734164 : if (SpGistPageIsLeaf(current.page))
4131 tgl 2109 ECB : {
2110 : SpGistLeafTuple leafTuple;
2111 : int nToSplit,
2112 : sizeToSplit;
2113 :
734 tgl 2114 GIC 402721 : leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
4131 tgl 2115 CBC 805442 : if (leafTuple->size + sizeof(ItemIdData) <=
2116 402721 : SpGistPageGetFreeSpace(current.page, 1))
4131 tgl 2117 ECB : {
2118 : /* it fits on page, so insert it and we're done */
4131 tgl 2119 GIC 398773 : addLeafTuple(index, state, leafTuple,
4046 tgl 2120 ECB : ¤t, &parent, isnull, isNew);
4131 tgl 2121 GIC 402584 : break;
4131 tgl 2122 ECB : }
4131 tgl 2123 GIC 3948 : else if ((sizeToSplit =
4131 tgl 2124 CBC 3948 : checkSplitConditions(index, state, ¤t,
2118 2125 1790 : &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
4131 2126 1790 : nToSplit < 64 &&
2127 1132 : leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
4131 tgl 2128 ECB : {
2129 : /*
2130 : * the amount of data is pretty small, so just move the whole
2131 : * chain to another leaf page rather than splitting it.
2132 : */
4131 tgl 2133 GIC 1108 : Assert(!isNew);
4046 tgl 2134 CBC 1108 : moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
4131 2135 1108 : break; /* we're done */
4131 tgl 2136 ECB : }
2137 : else
2138 : {
2139 : /* picksplit */
4131 tgl 2140 GIC 2840 : if (doPickSplit(index, state, ¤t, &parent,
4046 tgl 2141 ECB : leafTuple, level, isnull, isNew))
4131 tgl 2142 GIC 2703 : break; /* doPickSplit installed new tuples */
4131 tgl 2143 ECB :
2144 : /* leaf tuple will not be inserted yet */
4131 tgl 2145 GIC 137 : pfree(leafTuple);
4131 tgl 2146 ECB :
2147 : /*
2148 : * current now describes new inner tuple, go insert into it
2149 : */
4131 tgl 2150 GIC 137 : Assert(!SpGistPageIsLeaf(current.page));
4131 tgl 2151 CBC 137 : goto process_inner_tuple;
4131 tgl 2152 ECB : }
2153 : }
2154 : else /* non-leaf page */
2155 : {
2156 : /*
2157 : * Apply the opclass choose function to figure out how to insert
2158 : * the given datum into the current inner tuple.
2159 : */
2160 : SpGistInnerTuple innerTuple;
2161 : spgChooseIn in;
2162 : spgChooseOut out;
2163 :
2164 : /*
2165 : * spgAddNode and spgSplitTuple cases will loop back to here to
2166 : * complete the insertion operation. Just in case the choose
2167 : * function is broken and produces add or split requests
2168 : * repeatedly, check for query cancel (see comments above).
2169 : */
4131 tgl 2170 GIC 9331580 : process_inner_tuple:
695 tgl 2171 CBC 9332684 : if (INTERRUPTS_PENDING_CONDITION())
695 tgl 2172 ECB : {
695 tgl 2173 UIC 0 : result = false;
695 tgl 2174 UBC 0 : break;
695 tgl 2175 EUB : }
2176 :
4131 tgl 2177 GIC 9332684 : innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2118 tgl 2178 CBC 9332684 : PageGetItemId(current.page, current.offnum));
4131 tgl 2179 ECB :
734 tgl 2180 GIC 9332684 : in.datum = datums[spgKeyColumn];
734 tgl 2181 CBC 9332684 : in.leafDatum = leafDatums[spgKeyColumn];
4131 2182 9332684 : in.level = level;
2183 9332684 : in.allTheSame = innerTuple->allTheSame;
2184 9332684 : in.hasPrefix = (innerTuple->prefixSize > 0);
2185 9332684 : in.prefixDatum = SGITDATUM(innerTuple, state);
2186 9332684 : in.nNodes = innerTuple->nNodes;
2187 9332684 : in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
4131 tgl 2188 ECB :
4131 tgl 2189 GIC 9332684 : memset(&out, 0, sizeof(out));
4131 tgl 2190 ECB :
4046 tgl 2191 GIC 9332684 : if (!isnull)
4046 tgl 2192 ECB : {
2193 : /* use user-defined choose method */
4046 tgl 2194 GIC 9332684 : FunctionCall2Coll(procinfo,
4046 tgl 2195 CBC 9332684 : index->rd_indcollation[0],
4046 tgl 2196 ECB : PointerGetDatum(&in),
2197 : PointerGetDatum(&out));
2198 : }
2199 : else
2200 : {
2201 : /* force "match" action (to insert to random subnode) */
4046 tgl 2202 UIC 0 : out.resultType = spgMatchNode;
4046 tgl 2203 EUB : }
2204 :
4131 tgl 2205 GIC 9332684 : if (innerTuple->allTheSame)
4131 tgl 2206 ECB : {
2207 : /*
2208 : * It's not allowed to do an AddNode at an allTheSame tuple.
2209 : * Opclass must say "match", in which case we choose a random
2210 : * one of the nodes to descend into, or "split".
2211 : */
4131 tgl 2212 GIC 71207 : if (out.resultType == spgAddNode)
4131 tgl 2213 LBC 0 : elog(ERROR, "cannot add a node to an allTheSame inner tuple");
4131 tgl 2214 GBC 71207 : else if (out.resultType == spgMatchNode)
497 tgl 2215 CBC 71201 : out.result.matchNode.nodeN =
2216 71201 : pg_prng_uint64_range(&pg_global_prng_state,
2217 71201 : 0, innerTuple->nNodes - 1);
4131 tgl 2218 ECB : }
2219 :
4131 tgl 2220 GIC 9332684 : switch (out.resultType)
4131 tgl 2221 ECB : {
4131 tgl 2222 GIC 9331580 : case spgMatchNode:
4131 tgl 2223 ECB : /* Descend to N'th child node */
4131 tgl 2224 GIC 9331580 : spgMatchNodeAction(index, state, innerTuple,
4131 tgl 2225 ECB : ¤t, &parent,
2226 : out.result.matchNode.nodeN);
2227 : /* Adjust level as per opclass request */
4131 tgl 2228 GIC 9331580 : level += out.result.matchNode.levelAdd;
4131 tgl 2229 ECB : /* Replace leafDatum and recompute leafSize */
4046 tgl 2230 GIC 9331580 : if (!isnull)
4046 tgl 2231 ECB : {
734 tgl 2232 GIC 9331580 : leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
734 tgl 2233 CBC 9331580 : leafSize = SpGistGetLeafTupleSize(leafDescriptor,
734 tgl 2234 ECB : leafDatums, isnulls);
738 tgl 2235 GIC 9331580 : leafSize += sizeof(ItemIdData);
4046 tgl 2236 ECB : }
2237 :
2238 : /*
2239 : * Check new tuple size; fail if it can't fit, unless the
2240 : * opclass says it can handle the situation by suffixing.
2241 : *
2242 : * However, the opclass can only shorten the leaf datum,
2243 : * which may not be enough to ever make the tuple fit,
2244 : * since INCLUDE columns might alone use more than a page.
2245 : * Depending on the opclass' behavior, that could lead to
2246 : * an infinite loop --- spgtextproc.c, for example, will
2247 : * just repeatedly generate an empty-string leaf datum
2248 : * once it runs out of data. Actual bugs in opclasses
2249 : * might cause infinite looping, too. To detect such a
2250 : * loop, check to see if we are making progress by
2251 : * reducing the leafSize in each pass. This is a bit
2252 : * tricky though. Because of alignment considerations,
2253 : * the total tuple size might not decrease on every pass.
2254 : * Also, there are edge cases where the choose method
2255 : * might seem to not make progress for a cycle or two.
2256 : * Somewhat arbitrarily, we allow up to 10 no-progress
2257 : * iterations before failing. (This limit should be more
2258 : * than MAXALIGN, to accommodate opclasses that trim one
2259 : * byte from the leaf datum per pass.)
2260 : */
695 tgl 2261 GIC 9331580 : if (leafSize > SPGIST_PAGE_CAPACITY)
695 tgl 2262 ECB : {
695 tgl 2263 GIC 26 : bool ok = false;
695 tgl 2264 ECB :
695 tgl 2265 GIC 26 : if (state->config.longValuesOK && !isnull)
695 tgl 2266 ECB : {
695 tgl 2267 GIC 26 : if (leafSize < bestLeafSize)
695 tgl 2268 ECB : {
695 tgl 2269 GIC 2 : ok = true;
695 tgl 2270 CBC 2 : bestLeafSize = leafSize;
2271 2 : numNoProgressCycles = 0;
695 tgl 2272 ECB : }
695 tgl 2273 GIC 24 : else if (++numNoProgressCycles < 10)
695 tgl 2274 CBC 22 : ok = true;
695 tgl 2275 ECB : }
695 tgl 2276 GIC 26 : if (!ok)
695 tgl 2277 CBC 2 : ereport(ERROR,
695 tgl 2278 ECB : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2279 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2280 : leafSize - sizeof(ItemIdData),
2281 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2282 : RelationGetRelationName(index)),
2283 : errhint("Values larger than a buffer page cannot be indexed.")));
2284 : }
2285 :
2286 : /*
2287 : * Loop around and attempt to insert the new leafDatum at
2288 : * "current" (which might reference an existing child
2289 : * tuple, or might be invalid to force us to find a new
2290 : * page for the tuple).
2291 : */
4131 tgl 2292 GIC 9331578 : break;
4131 tgl 2293 CBC 783 : case spgAddNode:
4131 tgl 2294 ECB : /* AddNode is not sensible if nodes don't have labels */
4131 tgl 2295 GIC 783 : if (in.nodeLabels == NULL)
4131 tgl 2296 LBC 0 : elog(ERROR, "cannot add a node to an inner tuple without node labels");
4131 tgl 2297 EUB : /* Add node to inner tuple, per request */
4131 tgl 2298 GIC 783 : spgAddNodeAction(index, state, innerTuple,
4131 tgl 2299 ECB : ¤t, &parent,
2300 : out.result.addNode.nodeN,
2301 : out.result.addNode.nodeLabel);
2302 :
2303 : /*
2304 : * Retry insertion into the enlarged node. We assume that
2305 : * we'll get a MatchNode result this time.
2306 : */
4131 tgl 2307 GIC 783 : goto process_inner_tuple;
4131 tgl 2308 ECB : break;
4131 tgl 2309 GIC 321 : case spgSplitTuple:
4131 tgl 2310 ECB : /* Split inner tuple, per request */
4131 tgl 2311 GIC 321 : spgSplitNodeAction(index, state, innerTuple,
4131 tgl 2312 ECB : ¤t, &out);
2313 :
2314 : /* Retry insertion into the split node */
4131 tgl 2315 GIC 321 : goto process_inner_tuple;
4131 tgl 2316 ECB : break;
4131 tgl 2317 UIC 0 : default:
4131 tgl 2318 UBC 0 : elog(ERROR, "unrecognized SPGiST choose result: %d",
4131 tgl 2319 EUB : (int) out.resultType);
2320 : break;
2321 : }
2322 : }
2323 : } /* end loop */
2324 :
2325 : /*
2326 : * Release any buffers we're still holding. Beware of possibility that
2327 : * current and parent reference same buffer.
2328 : */
4131 tgl 2329 GIC 402584 : if (current.buffer != InvalidBuffer)
4131 tgl 2330 ECB : {
4131 tgl 2331 GIC 402584 : SpGistSetLastUsedPage(index, current.buffer);
4131 tgl 2332 CBC 402584 : UnlockReleaseBuffer(current.buffer);
4131 tgl 2333 ECB : }
4131 tgl 2334 GIC 402584 : if (parent.buffer != InvalidBuffer &&
4131 tgl 2335 CBC 394098 : parent.buffer != current.buffer)
4131 tgl 2336 ECB : {
4131 tgl 2337 GIC 391781 : SpGistSetLastUsedPage(index, parent.buffer);
4131 tgl 2338 CBC 391781 : UnlockReleaseBuffer(parent.buffer);
4131 tgl 2339 ECB : }
2340 :
2341 : /*
2342 : * We do not support being called while some outer function is holding a
2343 : * buffer lock (or any other reason to postpone query cancels). If that
2344 : * were the case, telling the caller to retry would create an infinite
2345 : * loop.
2346 : */
695 tgl 2347 GIC 402584 : Assert(INTERRUPTS_CAN_BE_PROCESSED());
695 tgl 2348 ECB :
2349 : /*
2350 : * Finally, check for interrupts again. If there was a query cancel,
2351 : * ProcessInterrupts() will be able to throw the error here. If it was
2352 : * some other kind of interrupt that can just be cleared, return false to
2353 : * tell our caller to retry.
2354 : */
695 tgl 2355 GIC 402584 : CHECK_FOR_INTERRUPTS();
695 tgl 2356 ECB :
695 tgl 2357 GIC 402584 : return result;
4131 tgl 2358 ECB : }
|