Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * spgdoinsert.c
4 : : * implementation of insert algorithm
5 : : *
6 : : *
7 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/spgist/spgdoinsert.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres.h"
17 : :
18 : : #include "access/genam.h"
19 : : #include "access/spgist_private.h"
20 : : #include "access/spgxlog.h"
21 : : #include "access/xloginsert.h"
22 : : #include "common/int.h"
23 : : #include "common/pg_prng.h"
24 : : #include "miscadmin.h"
25 : : #include "storage/bufmgr.h"
26 : : #include "utils/rel.h"
27 : :
28 : :
29 : : /*
30 : : * SPPageDesc tracks all info about a page we are inserting into. In some
31 : : * situations it actually identifies a tuple, or even a specific node within
32 : : * an inner tuple. But any of the fields can be invalid. If the buffer
33 : : * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 : : * page pointer should be valid exactly when buffer is.
35 : : */
36 : : typedef struct SPPageDesc
37 : : {
38 : : BlockNumber blkno; /* block number, or InvalidBlockNumber */
39 : : Buffer buffer; /* page's buffer number, or InvalidBuffer */
40 : : Page page; /* pointer to page buffer, or NULL */
41 : : OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
42 : : int node; /* node number within inner tuple, or -1 */
43 : : } SPPageDesc;
44 : :
45 : :
46 : : /*
47 : : * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 : : * is used to update the parent inner tuple's downlink after a move or
49 : : * split operation.
50 : : */
51 : : void
4500 tgl@sss.pgh.pa.us 52 :CBC 6011 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
53 : : BlockNumber blkno, OffsetNumber offset)
54 : : {
55 : : int i;
56 : : SpGistNodeTuple node;
57 : :
4502 58 [ + - ]: 30706 : SGITITERATE(tup, i, node)
59 : : {
60 [ + + ]: 30706 : if (i == nodeN)
61 : : {
62 : 6011 : ItemPointerSet(&node->t_tid, blkno, offset);
63 : 6011 : return;
64 : : }
65 : : }
66 : :
4502 tgl@sss.pgh.pa.us 67 [ # # ]:UBC 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68 : : nodeN);
69 : : }
70 : :
71 : : /*
72 : : * Form a new inner tuple containing one more node than the given one, with
73 : : * the specified label datum, inserted at offset "offset" in the node array.
74 : : * The new tuple's prefix is the same as the old one's.
75 : : *
76 : : * Note that the new node initially has an invalid downlink. We'll find a
77 : : * page to point it to later.
78 : : */
79 : : static SpGistInnerTuple
4502 tgl@sss.pgh.pa.us 80 :CBC 789 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
81 : : {
82 : : SpGistNodeTuple node,
83 : : *nodes;
84 : : int i;
85 : :
86 : : /* if offset is negative, insert at end */
87 [ - + ]: 789 : if (offset < 0)
4502 tgl@sss.pgh.pa.us 88 :UBC 0 : offset = tuple->nNodes;
4502 tgl@sss.pgh.pa.us 89 [ - + ]:CBC 789 : else if (offset > tuple->nNodes)
4502 tgl@sss.pgh.pa.us 90 [ # # ]:UBC 0 : elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91 : :
4502 tgl@sss.pgh.pa.us 92 :CBC 789 : nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93 [ + + ]: 5059 : SGITITERATE(tuple, i, node)
94 : : {
95 [ + + ]: 4270 : if (i < offset)
96 : 3687 : nodes[i] = node;
97 : : else
98 : 583 : nodes[i + 1] = node;
99 : : }
100 : :
101 : 789 : nodes[offset] = spgFormNodeTuple(state, label, false);
102 : :
103 : 789 : return spgFormInnerTuple(state,
104 : 789 : (tuple->prefixSize > 0),
105 [ - + ]: 372 : SGITDATUM(tuple, state),
106 [ + + ]: 789 : tuple->nNodes + 1,
107 : : nodes);
108 : : }
109 : :
110 : : /* qsort comparator for sorting OffsetNumbers */
111 : : static int
112 : 3192144 : cmpOffsetNumbers(const void *a, const void *b)
113 : : {
58 nathan@postgresql.or 114 :GNC 3192144 : return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115 : : }
116 : :
117 : : /*
118 : : * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 : : *
120 : : * The first tuple in the given list is replaced with a dead tuple of type
121 : : * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 : : * with dead tuples of type "reststate". If either firststate or reststate
123 : : * is REDIRECT, blkno/offnum specify where to link to.
124 : : *
125 : : * NB: this is used during WAL replay, so beware of trying to make it too
126 : : * smart. In particular, it shouldn't use "state" except for calling
127 : : * spgFormDeadTuple(). This is also used in a critical section, so no
128 : : * pallocs either!
129 : : */
130 : : void
4502 tgl@sss.pgh.pa.us 131 :CBC 5456 : spgPageIndexMultiDelete(SpGistState *state, Page page,
132 : : OffsetNumber *itemnos, int nitems,
133 : : int firststate, int reststate,
134 : : BlockNumber blkno, OffsetNumber offnum)
135 : : {
136 : : OffsetNumber firstItem;
137 : : OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 : 5456 : SpGistDeadTuple tuple = NULL;
139 : : int i;
140 : :
141 [ + + ]: 5456 : if (nitems == 0)
142 : 491 : return; /* nothing to do */
143 : :
144 : : /*
145 : : * For efficiency we want to use PageIndexMultiDelete, which requires the
146 : : * targets to be listed in sorted order, so we have to sort the itemnos
147 : : * array. (This also greatly simplifies the math for reinserting the
148 : : * replacement tuples.) However, we must not scribble on the caller's
149 : : * array, so we have to make a copy.
150 : : */
151 : 4965 : memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 [ + + ]: 4965 : if (nitems > 1)
153 : 4521 : qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 : :
155 : 4965 : PageIndexMultiDelete(page, sortednos, nitems);
156 : :
157 : 4965 : firstItem = itemnos[0];
158 : :
159 [ + + ]: 454728 : for (i = 0; i < nitems; i++)
160 : : {
4326 bruce@momjian.us 161 : 449763 : OffsetNumber itemno = sortednos[i];
162 : : int tupstate;
163 : :
4502 tgl@sss.pgh.pa.us 164 [ + + ]: 449763 : tupstate = (itemno == firstItem) ? firststate : reststate;
165 [ + + + + ]: 449763 : if (tuple == NULL || tuple->tupstate != tupstate)
166 : 7292 : tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 : :
168 [ - + ]: 449763 : if (PageAddItem(page, (Item) tuple, tuple->size,
169 : : itemno, false, false) != itemno)
4502 tgl@sss.pgh.pa.us 170 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 : : tuple->size);
172 : :
4502 tgl@sss.pgh.pa.us 173 [ + + ]:CBC 449763 : if (tupstate == SPGIST_REDIRECT)
174 : 1202 : SpGistPageGetOpaque(page)->nRedirection++;
175 [ + - ]: 448561 : else if (tupstate == SPGIST_PLACEHOLDER)
176 : 448561 : SpGistPageGetOpaque(page)->nPlaceholder++;
177 : : }
178 : : }
179 : :
180 : : /*
181 : : * Update the parent inner tuple's downlink, and mark the parent buffer
182 : : * dirty (this must be the last change to the parent page in the current
183 : : * WAL action).
184 : : */
185 : : static void
186 : 4956 : saveNodeLink(Relation index, SPPageDesc *parent,
187 : : BlockNumber blkno, OffsetNumber offnum)
188 : : {
189 : : SpGistInnerTuple innerTuple;
190 : :
191 : 4956 : innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
2489 192 : 4956 : PageGetItemId(parent->page, parent->offnum));
193 : :
4500 194 : 4956 : spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 : :
4502 196 : 4956 : MarkBufferDirty(parent->buffer);
197 : 4956 : }
198 : :
199 : : /*
200 : : * Add a leaf tuple to a leaf page where there is known to be room for it
201 : : */
202 : : static void
203 : 398845 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 : : SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 : : {
206 : : spgxlogAddLeaf xlrec;
207 : :
208 : 398845 : xlrec.newPage = isNew;
4417 209 : 398845 : xlrec.storesNulls = isNulls;
210 : :
211 : : /* these will be filled below as needed */
4502 212 : 398845 : xlrec.offnumLeaf = InvalidOffsetNumber;
213 : 398845 : xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 : 398845 : xlrec.offnumParent = InvalidOffsetNumber;
215 : 398845 : xlrec.nodeI = 0;
216 : :
217 : 398845 : START_CRIT_SECTION();
218 : :
219 [ + + ]: 398845 : if (current->offnum == InvalidOffsetNumber ||
4417 220 [ + + + + ]: 397827 : SpGistBlockIsRoot(current->blkno))
221 : : {
222 : : /* Tuple is not part of a chain */
1105 223 : 9502 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
4502 224 : 19004 : current->offnum = SpGistPageAddNewItem(state, current->page,
2489 225 : 9502 : (Item) leafTuple, leafTuple->size,
226 : : NULL, false);
227 : :
4502 228 : 9502 : xlrec.offnumLeaf = current->offnum;
229 : :
230 : : /* Must update parent's downlink if any */
231 [ + + ]: 9502 : if (parent->buffer != InvalidBuffer)
232 : : {
233 : 1018 : xlrec.offnumParent = parent->offnum;
234 : 1018 : xlrec.nodeI = parent->node;
235 : :
236 : 1018 : saveNodeLink(index, parent, current->blkno, current->offnum);
237 : : }
238 : : }
239 : : else
240 : : {
241 : : /*
242 : : * Tuple must be inserted into existing chain. We mustn't change the
243 : : * chain's head address, but we don't need to chase the entire chain
244 : : * to put the tuple at the end; we can insert it second.
245 : : *
246 : : * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 : : * in which case we should replace the DEAD tuple in-place.
248 : : */
249 : : SpGistLeafTuple head;
250 : : OffsetNumber offnum;
251 : :
252 : 389343 : head = (SpGistLeafTuple) PageGetItem(current->page,
2489 253 : 389343 : PageGetItemId(current->page, current->offnum));
4502 254 [ + - ]: 389343 : if (head->tupstate == SPGIST_LIVE)
255 : : {
1105 256 : 389343 : SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
4502 257 : 389343 : offnum = SpGistPageAddNewItem(state, current->page,
258 : 389343 : (Item) leafTuple, leafTuple->size,
259 : : NULL, false);
260 : :
261 : : /*
262 : : * re-get head of list because it could have been moved on page,
263 : : * and set new second element
264 : : */
265 : 389343 : head = (SpGistLeafTuple) PageGetItem(current->page,
2489 266 : 389343 : PageGetItemId(current->page, current->offnum));
1105 267 : 389343 : SGLT_SET_NEXTOFFSET(head, offnum);
268 : :
4502 269 : 389343 : xlrec.offnumLeaf = offnum;
270 : 389343 : xlrec.offnumHeadLeaf = current->offnum;
271 : : }
4502 tgl@sss.pgh.pa.us 272 [ # # ]:UBC 0 : else if (head->tupstate == SPGIST_DEAD)
273 : : {
1105 274 : 0 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
4502 275 : 0 : PageIndexTupleDelete(current->page, current->offnum);
276 : 0 : if (PageAddItem(current->page,
277 : : (Item) leafTuple, leafTuple->size,
278 [ # # ]: 0 : current->offnum, false, false) != current->offnum)
279 [ # # ]: 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 : : leafTuple->size);
281 : :
282 : : /* WAL replay distinguishes this case by equal offnums */
283 : 0 : xlrec.offnumLeaf = current->offnum;
284 : 0 : xlrec.offnumHeadLeaf = current->offnum;
285 : : }
286 : : else
287 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 : : }
289 : :
4502 tgl@sss.pgh.pa.us 290 :CBC 398845 : MarkBufferDirty(current->buffer);
291 : :
1838 heikki.linnakangas@i 292 [ + + + + : 398845 : if (RelationNeedsWAL(index) && !state->isBuild)
+ + + + +
+ ]
293 : : {
294 : : XLogRecPtr recptr;
295 : : int flags;
296 : :
3433 297 : 120251 : XLogBeginInsert();
298 : 120251 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 : 120251 : XLogRegisterData((char *) leafTuple, leafTuple->size);
300 : :
3191 301 : 120251 : flags = REGBUF_STANDARD;
302 [ + + ]: 120251 : if (xlrec.newPage)
303 : 4 : flags |= REGBUF_WILL_INIT;
304 : 120251 : XLogRegisterBuffer(0, current->buffer, flags);
3433 305 [ + + ]: 120251 : if (xlrec.offnumParent != InvalidOffsetNumber)
306 : 413 : XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307 : :
308 : 120251 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 : :
4502 tgl@sss.pgh.pa.us 310 : 120251 : PageSetLSN(current->page, recptr);
311 : :
312 : : /* update parent only if we actually changed it */
3433 heikki.linnakangas@i 313 [ + + ]: 120251 : if (xlrec.offnumParent != InvalidOffsetNumber)
314 : : {
4502 tgl@sss.pgh.pa.us 315 : 413 : PageSetLSN(parent->page, recptr);
316 : : }
317 : : }
318 : :
319 [ - + ]: 398845 : END_CRIT_SECTION();
320 : 398845 : }
321 : :
322 : : /*
323 : : * Count the number and total size of leaf tuples in the chain starting at
324 : : * current->offnum. Return number into *nToSplit and total size as function
325 : : * result.
326 : : *
327 : : * Klugy special case when considering the root page (i.e., root is a leaf
328 : : * page, but we're about to split for the first time): return fake large
329 : : * values to force spgdoinsert() to take the doPickSplit rather than
330 : : * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331 : : */
332 : : static int
333 : 3976 : checkSplitConditions(Relation index, SpGistState *state,
334 : : SPPageDesc *current, int *nToSplit)
335 : : {
336 : : int i,
337 : 3976 : n = 0,
338 : 3976 : totalSize = 0;
339 : :
4417 340 [ + + - + ]: 3976 : if (SpGistBlockIsRoot(current->blkno))
341 : : {
342 : : /* return impossible values to force split */
4502 343 : 41 : *nToSplit = BLCKSZ;
344 : 41 : return BLCKSZ;
345 : : }
346 : :
347 : 3935 : i = current->offnum;
348 [ + + ]: 401149 : while (i != InvalidOffsetNumber)
349 : : {
350 : : SpGistLeafTuple it;
351 : :
352 [ + - - + ]: 397214 : Assert(i >= FirstOffsetNumber &&
353 : : i <= PageGetMaxOffsetNumber(current->page));
354 : 397214 : it = (SpGistLeafTuple) PageGetItem(current->page,
355 : : PageGetItemId(current->page, i));
356 [ + - ]: 397214 : if (it->tupstate == SPGIST_LIVE)
357 : : {
358 : 397214 : n++;
359 : 397214 : totalSize += it->size + sizeof(ItemIdData);
360 : : }
4502 tgl@sss.pgh.pa.us 361 [ # # ]:UBC 0 : else if (it->tupstate == SPGIST_DEAD)
362 : : {
363 : : /* We could see a DEAD tuple as first/only chain item */
364 [ # # ]: 0 : Assert(i == current->offnum);
1105 365 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
366 : : /* Don't count it in result, because it won't go to other page */
367 : : }
368 : : else
4502 369 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 : :
1105 tgl@sss.pgh.pa.us 371 :CBC 397214 : i = SGLT_GET_NEXTOFFSET(it);
372 : : }
373 : :
4502 374 : 3935 : *nToSplit = n;
375 : :
376 : 3935 : return totalSize;
377 : : }
378 : :
379 : : /*
380 : : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 : : * but the chain has to be moved because there's not enough room to add
382 : : * newLeafTuple to its page. We use this method when the chain contains
383 : : * very little data so a split would be inefficient. We are sure we can
384 : : * fit the chain plus newLeafTuple on one other page.
385 : : */
386 : : static void
387 : 1146 : moveLeafs(Relation index, SpGistState *state,
388 : : SPPageDesc *current, SPPageDesc *parent,
389 : : SpGistLeafTuple newLeafTuple, bool isNulls)
390 : : {
391 : : int i,
392 : : nDelete,
393 : : nInsert,
394 : : size;
395 : : Buffer nbuf;
396 : : Page npage;
397 : 1146 : OffsetNumber r = InvalidOffsetNumber,
398 : 1146 : startOffset = InvalidOffsetNumber;
399 : 1146 : bool replaceDead = false;
400 : : OffsetNumber *toDelete;
401 : : OffsetNumber *toInsert;
402 : : BlockNumber nblkno;
403 : : spgxlogMoveLeafs xlrec;
404 : : char *leafdata,
405 : : *leafptr;
406 : :
407 : : /* This doesn't work on root page */
408 [ - + ]: 1146 : Assert(parent->buffer != InvalidBuffer);
409 [ - + ]: 1146 : Assert(parent->buffer != current->buffer);
410 : :
411 : : /* Locate the tuples to be moved, and count up the space needed */
412 : 1146 : i = PageGetMaxOffsetNumber(current->page);
413 : 1146 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414 : 1146 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415 : :
416 : 1146 : size = newLeafTuple->size + sizeof(ItemIdData);
417 : :
418 : 1146 : nDelete = 0;
419 : 1146 : i = current->offnum;
420 [ + + ]: 46541 : while (i != InvalidOffsetNumber)
421 : : {
422 : : SpGistLeafTuple it;
423 : :
424 [ + - - + ]: 45395 : Assert(i >= FirstOffsetNumber &&
425 : : i <= PageGetMaxOffsetNumber(current->page));
426 : 45395 : it = (SpGistLeafTuple) PageGetItem(current->page,
427 : : PageGetItemId(current->page, i));
428 : :
429 [ + - ]: 45395 : if (it->tupstate == SPGIST_LIVE)
430 : : {
431 : 45395 : toDelete[nDelete] = i;
432 : 45395 : size += it->size + sizeof(ItemIdData);
433 : 45395 : nDelete++;
434 : : }
4502 tgl@sss.pgh.pa.us 435 [ # # ]:UBC 0 : else if (it->tupstate == SPGIST_DEAD)
436 : : {
437 : : /* We could see a DEAD tuple as first/only chain item */
438 [ # # ]: 0 : Assert(i == current->offnum);
1105 439 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
440 : : /* We don't want to move it, so don't count it in size */
4502 441 : 0 : toDelete[nDelete] = i;
442 : 0 : nDelete++;
443 : 0 : replaceDead = true;
444 : : }
445 : : else
446 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447 : :
1105 tgl@sss.pgh.pa.us 448 :CBC 45395 : i = SGLT_GET_NEXTOFFSET(it);
449 : : }
450 : :
451 : : /* Find a leaf page that will hold them */
4417 452 [ - + ]: 1146 : nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453 : : size, &xlrec.newPage);
2916 kgrittn@postgresql.o 454 : 1146 : npage = BufferGetPage(nbuf);
4502 tgl@sss.pgh.pa.us 455 : 1146 : nblkno = BufferGetBlockNumber(nbuf);
456 [ - + ]: 1146 : Assert(nblkno != current->blkno);
457 : :
458 : 1146 : leafdata = leafptr = palloc(size);
459 : :
460 : 1146 : START_CRIT_SECTION();
461 : :
462 : : /* copy all the old tuples to new page, unless they're dead */
463 : 1146 : nInsert = 0;
464 [ + - ]: 1146 : if (!replaceDead)
465 : : {
466 [ + + ]: 46541 : for (i = 0; i < nDelete; i++)
467 : : {
468 : : SpGistLeafTuple it;
469 : :
470 : 45395 : it = (SpGistLeafTuple) PageGetItem(current->page,
2489 471 : 45395 : PageGetItemId(current->page, toDelete[i]));
4502 472 [ - + ]: 45395 : Assert(it->tupstate == SPGIST_LIVE);
473 : :
474 : : /*
475 : : * Update chain link (notice the chain order gets reversed, but we
476 : : * don't care). We're modifying the tuple on the source page
477 : : * here, but it's okay since we're about to delete it.
478 : : */
1105 479 : 45395 : SGLT_SET_NEXTOFFSET(it, r);
480 : :
4502 481 : 45395 : r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
482 : : &startOffset, false);
483 : :
484 : 45395 : toInsert[nInsert] = r;
485 : 45395 : nInsert++;
486 : :
487 : : /* save modified tuple into leafdata as well */
488 : 45395 : memcpy(leafptr, it, it->size);
489 : 45395 : leafptr += it->size;
490 : : }
491 : : }
492 : :
493 : : /* add the new tuple as well */
1105 494 : 1146 : SGLT_SET_NEXTOFFSET(newLeafTuple, r);
4502 495 : 1146 : r = SpGistPageAddNewItem(state, npage,
496 : 1146 : (Item) newLeafTuple, newLeafTuple->size,
497 : : &startOffset, false);
498 : 1146 : toInsert[nInsert] = r;
499 : 1146 : nInsert++;
500 : 1146 : memcpy(leafptr, newLeafTuple, newLeafTuple->size);
501 : 1146 : leafptr += newLeafTuple->size;
502 : :
503 : : /*
504 : : * Now delete the old tuples, leaving a redirection pointer behind for the
505 : : * first one, unless we're doing an index build; in which case there can't
506 : : * be any concurrent scan so we need not provide a redirect.
507 : : */
508 : 1146 : spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
2489 509 [ + + ]: 1146 : state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
510 : : SPGIST_PLACEHOLDER,
511 : : nblkno, r);
512 : :
513 : : /* Update parent's downlink and mark parent page dirty */
4502 514 : 1146 : saveNodeLink(index, parent, nblkno, r);
515 : :
516 : : /* Mark the leaf pages too */
517 : 1146 : MarkBufferDirty(current->buffer);
518 : 1146 : MarkBufferDirty(nbuf);
519 : :
1838 heikki.linnakangas@i 520 [ + + + + : 1146 : if (RelationNeedsWAL(index) && !state->isBuild)
+ + + - +
+ ]
521 : : {
522 : : XLogRecPtr recptr;
523 : :
524 : : /* prepare WAL info */
3433 525 : 264 : STORE_STATE(state, xlrec.stateSrc);
526 : :
527 : 264 : xlrec.nMoves = nDelete;
528 : 264 : xlrec.replaceDead = replaceDead;
529 : 264 : xlrec.storesNulls = isNulls;
530 : :
531 : 264 : xlrec.offnumParent = parent->offnum;
532 : 264 : xlrec.nodeI = parent->node;
533 : :
534 : 264 : XLogBeginInsert();
535 : 264 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
536 : 264 : XLogRegisterData((char *) toDelete,
537 : : sizeof(OffsetNumber) * nDelete);
538 : 264 : XLogRegisterData((char *) toInsert,
539 : : sizeof(OffsetNumber) * nInsert);
540 : 264 : XLogRegisterData((char *) leafdata, leafptr - leafdata);
541 : :
542 : 264 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
543 [ + + ]: 264 : XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
544 : 264 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
545 : :
546 : 264 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
547 : :
4502 tgl@sss.pgh.pa.us 548 : 264 : PageSetLSN(current->page, recptr);
549 : 264 : PageSetLSN(npage, recptr);
550 : 264 : PageSetLSN(parent->page, recptr);
551 : : }
552 : :
553 [ - + ]: 1146 : END_CRIT_SECTION();
554 : :
555 : : /* Update local free-space cache and release new buffer */
556 : 1146 : SpGistSetLastUsedPage(index, nbuf);
557 : 1146 : UnlockReleaseBuffer(nbuf);
558 : 1146 : }
559 : :
560 : : /*
561 : : * Update previously-created redirection tuple with appropriate destination
562 : : *
563 : : * We use this when it's not convenient to know the destination first.
564 : : * The tuple should have been made with the "impossible" destination of
565 : : * the metapage.
566 : : */
567 : : static void
568 : 650 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
569 : : BlockNumber blkno, OffsetNumber offnum)
570 : : {
571 : : SpGistDeadTuple dt;
572 : :
573 : 650 : dt = (SpGistDeadTuple) PageGetItem(current->page,
574 : : PageGetItemId(current->page, position));
575 [ - + ]: 650 : Assert(dt->tupstate == SPGIST_REDIRECT);
576 [ - + ]: 650 : Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
577 : 650 : ItemPointerSet(&dt->pointer, blkno, offnum);
578 : 650 : }
579 : :
580 : : /*
581 : : * Test to see if the user-defined picksplit function failed to do its job,
582 : : * ie, it put all the leaf tuples into the same node.
583 : : * If so, randomly divide the tuples into several nodes (all with the same
584 : : * label) and return true to select allTheSame mode for this inner tuple.
585 : : *
586 : : * (This code is also used to forcibly select allTheSame mode for nulls.)
587 : : *
588 : : * If we know that the leaf tuples wouldn't all fit on one page, then we
589 : : * exclude the last tuple (which is the incoming new tuple that forced a split)
590 : : * from the check to see if more than one node is used. The reason for this
591 : : * is that if the existing tuples are put into only one chain, then even if
592 : : * we move them all to an empty page, there would still not be room for the
593 : : * new tuple, so we'd get into an infinite loop of picksplit attempts.
594 : : * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
595 : : * be split across pages. (Exercise for the reader: figure out why this
596 : : * fixes the problem even when there is only one old tuple.)
597 : : */
598 : : static bool
599 : 2830 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
600 : : bool *includeNew)
601 : : {
602 : : int theNode;
603 : : int limit;
604 : : int i;
605 : :
606 : : /* For the moment, assume we can include the new leaf tuple */
607 : 2830 : *includeNew = true;
608 : :
609 : : /* If there's only the new leaf tuple, don't select allTheSame mode */
610 [ + + ]: 2830 : if (in->nTuples <= 1)
611 : 22 : return false;
612 : :
613 : : /* If tuple set doesn't fit on one page, ignore the new tuple in test */
614 [ + + ]: 2808 : limit = tooBig ? in->nTuples - 1 : in->nTuples;
615 : :
616 : : /* Check to see if more than one node is populated */
617 : 2808 : theNode = out->mapTuplesToNodes[0];
618 [ + + ]: 62703 : for (i = 1; i < limit; i++)
619 : : {
620 [ + + ]: 62553 : if (out->mapTuplesToNodes[i] != theNode)
621 : 2658 : return false;
622 : : }
623 : :
624 : : /* Nope, so override the picksplit function's decisions */
625 : :
626 : : /* If the new tuple is in its own node, it can't be included in split */
627 [ + + - + ]: 150 : if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
4502 tgl@sss.pgh.pa.us 628 :UBC 0 : *includeNew = false;
629 : :
4502 tgl@sss.pgh.pa.us 630 :CBC 150 : out->nNodes = 8; /* arbitrary number of child nodes */
631 : :
632 : : /* Random assignment of tuples to nodes (note we include new tuple) */
633 [ + + ]: 15906 : for (i = 0; i < in->nTuples; i++)
634 : 15756 : out->mapTuplesToNodes[i] = i % out->nNodes;
635 : :
636 : : /* The opclass may not use node labels, but if it does, duplicate 'em */
637 [ + + ]: 150 : if (out->nodeLabels)
638 : : {
4326 bruce@momjian.us 639 : 26 : Datum theLabel = out->nodeLabels[theNode];
640 : :
4502 tgl@sss.pgh.pa.us 641 : 26 : out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
642 [ + + ]: 234 : for (i = 0; i < out->nNodes; i++)
643 : 208 : out->nodeLabels[i] = theLabel;
644 : : }
645 : :
646 : : /* We don't touch the prefix or the leaf tuple datum assignments */
647 : :
648 : 150 : return true;
649 : : }
650 : :
651 : : /*
652 : : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
653 : : * but the chain has to be split because there's not enough room to add
654 : : * newLeafTuple to its page.
655 : : *
656 : : * This function splits the leaf tuple set according to picksplit's rules,
657 : : * creating one or more new chains that are spread across the current page
658 : : * and an additional leaf page (we assume that two leaf pages will be
659 : : * sufficient). A new inner tuple is created, and the parent downlink
660 : : * pointer is updated to point to that inner tuple instead of the leaf chain.
661 : : *
662 : : * On exit, current contains the address of the new inner tuple.
663 : : *
664 : : * Returns true if we successfully inserted newLeafTuple during this function,
665 : : * false if caller still has to do it (meaning another picksplit operation is
666 : : * probably needed). Failure could occur if the picksplit result is fairly
667 : : * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
668 : : * Because we force the picksplit result to be at least two chains, each
669 : : * cycle will get rid of at least one leaf tuple from the chain, so the loop
670 : : * will eventually terminate if lack of balance is the issue. If the tuple
671 : : * is too big, we assume that repeated picksplit operations will eventually
672 : : * make it small enough by repeated prefix-stripping. A broken opclass could
673 : : * make this an infinite loop, though, so spgdoinsert() checks that the
674 : : * leaf datums get smaller each time.
675 : : */
676 : : static bool
677 : 2830 : doPickSplit(Relation index, SpGistState *state,
678 : : SPPageDesc *current, SPPageDesc *parent,
679 : : SpGistLeafTuple newLeafTuple,
680 : : int level, bool isNulls, bool isNew)
681 : : {
682 : 2830 : bool insertedNew = false;
683 : : spgPickSplitIn in;
684 : : spgPickSplitOut out;
685 : : FmgrInfo *procinfo;
686 : : bool includeNew;
687 : : int i,
688 : : max,
689 : : n;
690 : : SpGistInnerTuple innerTuple;
691 : : SpGistNodeTuple node,
692 : : *nodes;
693 : : Buffer newInnerBuffer,
694 : : newLeafBuffer;
695 : : uint8 *leafPageSelect;
696 : : int *leafSizes;
697 : : OffsetNumber *toDelete;
698 : : OffsetNumber *toInsert;
699 : 2830 : OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700 : : OffsetNumber startOffsets[2];
701 : : SpGistLeafTuple *oldLeafs;
702 : : SpGistLeafTuple *newLeafs;
703 : : Datum leafDatums[INDEX_MAX_KEYS];
704 : : bool leafIsnulls[INDEX_MAX_KEYS];
705 : : int spaceToDelete;
706 : : int currentFreeSpace;
707 : : int totalLeafSizes;
708 : : bool allTheSame;
709 : : spgxlogPickSplit xlrec;
710 : : char *leafdata,
711 : : *leafptr;
712 : : SPPageDesc saveCurrent;
713 : : int nToDelete,
714 : : nToInsert,
715 : : maxToInclude;
716 : :
717 : 2830 : in.level = level;
718 : :
719 : : /*
720 : : * Allocate per-leaf-tuple work arrays with max possible size
721 : : */
722 : 2830 : max = PageGetMaxOffsetNumber(current->page);
723 : 2830 : n = max + 1;
724 : 2830 : in.datums = (Datum *) palloc(sizeof(Datum) * n);
725 : 2830 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726 : 2830 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
1105 727 : 2830 : oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
4502 728 : 2830 : newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
729 : 2830 : leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
730 : :
731 : 2830 : STORE_STATE(state, xlrec.stateSrc);
732 : :
733 : : /*
734 : : * Form list of leaf tuples which will be distributed as split result;
735 : : * also, count up the amount of space that will be freed from current.
736 : : * (Note that in the non-root case, we won't actually delete the old
737 : : * tuples, only replace them with redirects or placeholders.)
738 : : */
739 : 2830 : nToInsert = 0;
740 : 2830 : nToDelete = 0;
741 : 2830 : spaceToDelete = 0;
4417 742 [ + + - + ]: 2830 : if (SpGistBlockIsRoot(current->blkno))
743 : : {
744 : : /*
745 : : * We are splitting the root (which up to now is also a leaf page).
746 : : * Its tuples are not linked, so scan sequentially to get them all. We
747 : : * ignore the original value of current->offnum.
748 : : */
4502 749 [ + + ]: 7709 : for (i = FirstOffsetNumber; i <= max; i++)
750 : : {
751 : : SpGistLeafTuple it;
752 : :
753 : 7668 : it = (SpGistLeafTuple) PageGetItem(current->page,
754 : : PageGetItemId(current->page, i));
755 [ + - ]: 7668 : if (it->tupstate == SPGIST_LIVE)
756 : : {
1109 757 : 7668 : in.datums[nToInsert] =
758 [ + - ]: 7668 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
1105 759 : 7668 : oldLeafs[nToInsert] = it;
4502 760 : 7668 : nToInsert++;
761 : 7668 : toDelete[nToDelete] = i;
762 : 7668 : nToDelete++;
763 : : /* we will delete the tuple altogether, so count full space */
764 : 7668 : spaceToDelete += it->size + sizeof(ItemIdData);
765 : : }
766 : : else /* tuples on root should be live */
4502 tgl@sss.pgh.pa.us 767 [ # # ]:UBC 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
768 : : }
769 : : }
770 : : else
771 : : {
772 : : /* Normal case, just collect the leaf tuples in the chain */
4502 tgl@sss.pgh.pa.us 773 :CBC 2789 : i = current->offnum;
774 [ + + ]: 354608 : while (i != InvalidOffsetNumber)
775 : : {
776 : : SpGistLeafTuple it;
777 : :
778 [ + - - + ]: 351819 : Assert(i >= FirstOffsetNumber && i <= max);
779 : 351819 : it = (SpGistLeafTuple) PageGetItem(current->page,
780 : : PageGetItemId(current->page, i));
781 [ + - ]: 351819 : if (it->tupstate == SPGIST_LIVE)
782 : : {
1109 783 : 351819 : in.datums[nToInsert] =
784 [ + - ]: 351819 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
1105 785 : 351819 : oldLeafs[nToInsert] = it;
4502 786 : 351819 : nToInsert++;
787 : 351819 : toDelete[nToDelete] = i;
788 : 351819 : nToDelete++;
789 : : /* we will not delete the tuple, only replace with dead */
790 [ - + ]: 351819 : Assert(it->size >= SGDTSIZE);
791 : 351819 : spaceToDelete += it->size - SGDTSIZE;
792 : : }
4502 tgl@sss.pgh.pa.us 793 [ # # ]:UBC 0 : else if (it->tupstate == SPGIST_DEAD)
794 : : {
795 : : /* We could see a DEAD tuple as first/only chain item */
796 [ # # ]: 0 : Assert(i == current->offnum);
1105 797 [ # # ]: 0 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
4502 798 : 0 : toDelete[nToDelete] = i;
799 : 0 : nToDelete++;
800 : : /* replacing it with redirect will save no space */
801 : : }
802 : : else
803 [ # # ]: 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
804 : :
1105 tgl@sss.pgh.pa.us 805 :CBC 351819 : i = SGLT_GET_NEXTOFFSET(it);
806 : : }
807 : : }
4502 808 : 2830 : in.nTuples = nToInsert;
809 : :
810 : : /*
811 : : * We may not actually insert new tuple because another picksplit may be
812 : : * necessary due to too large value, but we will try to allocate enough
813 : : * space to include it; and in any case it has to be included in the input
814 : : * for the picksplit function. So don't increment nToInsert yet.
815 : : */
1109 816 : 2830 : in.datums[in.nTuples] =
817 [ + - ]: 2830 : isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
1105 818 : 2830 : oldLeafs[in.nTuples] = newLeafTuple;
4502 819 : 2830 : in.nTuples++;
820 : :
821 : 2830 : memset(&out, 0, sizeof(out));
822 : :
4417 823 [ + - ]: 2830 : if (!isNulls)
824 : : {
825 : : /*
826 : : * Perform split using user-defined method.
827 : : */
828 : 2830 : procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829 : 2830 : FunctionCall2Coll(procinfo,
830 : 2830 : index->rd_indcollation[0],
831 : : PointerGetDatum(&in),
832 : : PointerGetDatum(&out));
833 : :
834 : : /*
835 : : * Form new leaf tuples and count up the total space needed.
836 : : */
837 : 2830 : totalLeafSizes = 0;
838 [ + + ]: 365147 : for (i = 0; i < in.nTuples; i++)
839 : : {
1105 840 [ + + ]: 362317 : if (state->leafTupDesc->natts > 1)
841 : 30453 : spgDeformLeafTuple(oldLeafs[i],
842 : : state->leafTupDesc,
843 : : leafDatums,
844 : : leafIsnulls,
845 : : isNulls);
846 : :
847 : 362317 : leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
848 : 362317 : leafIsnulls[spgKeyColumn] = false;
849 : :
850 : 362317 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
851 : : leafDatums,
852 : : leafIsnulls);
4417 853 : 362317 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
854 : : }
855 : : }
856 : : else
857 : : {
858 : : /*
859 : : * Perform dummy split that puts all tuples into one node.
860 : : * checkAllTheSame will override this and force allTheSame mode.
861 : : */
4417 tgl@sss.pgh.pa.us 862 :UBC 0 : out.hasPrefix = false;
863 : 0 : out.nNodes = 1;
864 : 0 : out.nodeLabels = NULL;
865 : 0 : out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
866 : :
867 : : /*
868 : : * Form new leaf tuples and count up the total space needed.
869 : : */
870 : 0 : totalLeafSizes = 0;
871 [ # # ]: 0 : for (i = 0; i < in.nTuples; i++)
872 : : {
1105 873 [ # # ]: 0 : if (state->leafTupDesc->natts > 1)
874 : 0 : spgDeformLeafTuple(oldLeafs[i],
875 : : state->leafTupDesc,
876 : : leafDatums,
877 : : leafIsnulls,
878 : : isNulls);
879 : :
880 : : /*
881 : : * Nulls tree can contain only null key values.
882 : : */
883 : 0 : leafDatums[spgKeyColumn] = (Datum) 0;
884 : 0 : leafIsnulls[spgKeyColumn] = true;
885 : :
886 : 0 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
887 : : leafDatums,
888 : : leafIsnulls);
4417 889 : 0 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
890 : : }
891 : : }
892 : :
893 : : /*
894 : : * Check to see if the picksplit function failed to separate the values,
895 : : * ie, it put them all into the same child node. If so, select allTheSame
896 : : * mode and create a random split instead. See comments for
897 : : * checkAllTheSame as to why we need to know if the new leaf tuples could
898 : : * fit on one page.
899 : : */
4502 tgl@sss.pgh.pa.us 900 :CBC 2830 : allTheSame = checkAllTheSame(&in, &out,
901 : : totalLeafSizes > SPGIST_PAGE_CAPACITY,
902 : : &includeNew);
903 : :
904 : : /*
905 : : * If checkAllTheSame decided we must exclude the new tuple, don't
906 : : * consider it any further.
907 : : */
908 [ + - ]: 2830 : if (includeNew)
909 : 2830 : maxToInclude = in.nTuples;
910 : : else
911 : : {
4502 tgl@sss.pgh.pa.us 912 :UBC 0 : maxToInclude = in.nTuples - 1;
913 : 0 : totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
914 : : }
915 : :
916 : : /*
917 : : * Allocate per-node work arrays. Since checkAllTheSame could replace
918 : : * out.nNodes with a value larger than the number of tuples on the input
919 : : * page, we can't allocate these arrays before here.
920 : : */
4502 tgl@sss.pgh.pa.us 921 :CBC 2830 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
922 : 2830 : leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
923 : :
924 : : /*
925 : : * Form nodes of inner tuple and inner tuple itself
926 : : */
927 [ + + ]: 22001 : for (i = 0; i < out.nNodes; i++)
928 : : {
929 : 19171 : Datum label = (Datum) 0;
4417 930 : 19171 : bool labelisnull = (out.nodeLabels == NULL);
931 : :
932 [ + + ]: 19171 : if (!labelisnull)
4502 933 : 1807 : label = out.nodeLabels[i];
4417 934 : 19171 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
935 : : }
4502 936 : 2830 : innerTuple = spgFormInnerTuple(state,
937 : 2830 : out.hasPrefix, out.prefixDatum,
938 : : out.nNodes, nodes);
939 : 2830 : innerTuple->allTheSame = allTheSame;
940 : :
941 : : /*
942 : : * Update nodes[] array to point into the newly formed innerTuple, so that
943 : : * we can adjust their downlinks below.
944 : : */
945 [ + + ]: 22001 : SGITITERATE(innerTuple, i, node)
946 : : {
947 : 19171 : nodes[i] = node;
948 : : }
949 : :
950 : : /*
951 : : * Re-scan new leaf tuples and count up the space needed under each node.
952 : : */
953 [ + + ]: 365147 : for (i = 0; i < maxToInclude; i++)
954 : : {
955 : 362317 : n = out.mapTuplesToNodes[i];
956 [ + - - + ]: 362317 : if (n < 0 || n >= out.nNodes)
4502 tgl@sss.pgh.pa.us 957 [ # # ]:UBC 0 : elog(ERROR, "inconsistent result of SPGiST picksplit function");
4502 tgl@sss.pgh.pa.us 958 :CBC 362317 : leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
959 : : }
960 : :
961 : : /*
962 : : * To perform the split, we must insert a new inner tuple, which can't go
963 : : * on a leaf page; and unless we are splitting the root page, we must then
964 : : * update the parent tuple's downlink to point to the inner tuple. If
965 : : * there is room, we'll put the new inner tuple on the same page as the
966 : : * parent tuple, otherwise we need another non-leaf buffer. But if the
967 : : * parent page is the root, we can't add the new inner tuple there,
968 : : * because the root page must have only one inner tuple.
969 : : */
970 : 2830 : xlrec.initInner = false;
971 [ + + ]: 2830 : if (parent->buffer != InvalidBuffer &&
4417 972 [ + + + - ]: 2789 : !SpGistBlockIsRoot(parent->blkno) &&
4502 973 [ + - ]: 2606 : (SpGistPageGetFreeSpace(parent->page, 1) >=
974 [ + + ]: 2606 : innerTuple->size + sizeof(ItemIdData)))
975 : : {
976 : : /* New inner tuple will fit on parent page */
977 : 2402 : newInnerBuffer = parent->buffer;
978 : : }
979 [ + + ]: 428 : else if (parent->buffer != InvalidBuffer)
980 : : {
981 : : /* Send tuple to page with next triple parity (see README) */
982 : 774 : newInnerBuffer = SpGistGetBuffer(index,
2489 983 : 387 : GBUF_INNER_PARITY(parent->blkno + 1) |
4417 tgl@sss.pgh.pa.us 984 :UBC 0 : (isNulls ? GBUF_NULLS : 0),
2489 tgl@sss.pgh.pa.us 985 [ - + ]:CBC 387 : innerTuple->size + sizeof(ItemIdData),
986 : : &xlrec.initInner);
987 : : }
988 : : else
989 : : {
990 : : /* Root page split ... inner tuple will go to root page */
4502 991 : 41 : newInnerBuffer = InvalidBuffer;
992 : : }
993 : :
994 : : /*
995 : : * The new leaf tuples converted from the existing ones should require the
996 : : * same or less space, and therefore should all fit onto one page
997 : : * (although that's not necessarily the current page, since we can't
998 : : * delete the old tuples but only replace them with placeholders).
999 : : * However, the incoming new tuple might not also fit, in which case we
1000 : : * might need another picksplit cycle to reduce it some more.
1001 : : *
1002 : : * If there's not room to put everything back onto the current page, then
1003 : : * we decide on a per-node basis which tuples go to the new page. (We do
1004 : : * it like that because leaf tuple chains can't cross pages, so we must
1005 : : * place all leaf tuples belonging to the same parent node on the same
1006 : : * page.)
1007 : : *
1008 : : * If we are splitting the root page (turning it from a leaf page into an
1009 : : * inner page), then no leaf tuples can go back to the current page; they
1010 : : * must all go somewhere else.
1011 : : */
4417 1012 [ + + + - ]: 2830 : if (!SpGistBlockIsRoot(current->blkno))
4502 1013 : 2789 : currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1014 : : else
1015 : 41 : currentFreeSpace = 0; /* prevent assigning any tuples to current */
1016 : :
1017 : 2830 : xlrec.initDest = false;
1018 : :
1019 [ + + ]: 2830 : if (totalLeafSizes <= currentFreeSpace)
1020 : : {
1021 : : /* All the leaf tuples will fit on current page */
1022 : 12 : newLeafBuffer = InvalidBuffer;
1023 : : /* mark new leaf tuple as included in insertions, if allowed */
1024 [ + - ]: 12 : if (includeNew)
1025 : : {
1026 : 12 : nToInsert++;
1027 : 12 : insertedNew = true;
1028 : : }
1029 [ + + ]: 1485 : for (i = 0; i < nToInsert; i++)
2489 1030 : 1473 : leafPageSelect[i] = 0; /* signifies current page */
1031 : : }
4502 1032 [ + + + - ]: 2818 : else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1033 : : {
1034 : : /*
1035 : : * We're trying to split up a long value by repeated suffixing, but
1036 : : * it's not going to fit yet. Don't bother allocating a second leaf
1037 : : * buffer that we won't be able to use.
1038 : : */
1039 : 22 : newLeafBuffer = InvalidBuffer;
1040 [ - + ]: 22 : Assert(includeNew);
1041 [ - + ]: 22 : Assert(nToInsert == 0);
1042 : : }
1043 : : else
1044 : : {
1045 : : /* We will need another leaf page */
1046 : : uint8 *nodePageSelect;
1047 : : int curspace;
1048 : : int newspace;
1049 : :
4417 1050 [ - + ]: 2796 : newLeafBuffer = SpGistGetBuffer(index,
1051 : : GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
4502 1052 [ + + ]: 2796 : Min(totalLeafSizes,
1053 : : SPGIST_PAGE_CAPACITY),
1054 : : &xlrec.initDest);
1055 : :
1056 : : /*
1057 : : * Attempt to assign node groups to the two pages. We might fail to
1058 : : * do so, even if totalLeafSizes is less than the available space,
1059 : : * because we can't split a group across pages.
1060 : : */
1061 : 2796 : nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1062 : :
1063 : 2796 : curspace = currentFreeSpace;
2916 kgrittn@postgresql.o 1064 : 2796 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
4502 tgl@sss.pgh.pa.us 1065 [ + + ]: 21921 : for (i = 0; i < out.nNodes; i++)
1066 : : {
1067 [ + + ]: 19125 : if (leafSizes[i] <= curspace)
1068 : : {
4326 bruce@momjian.us 1069 : 12427 : nodePageSelect[i] = 0; /* signifies current page */
4502 tgl@sss.pgh.pa.us 1070 : 12427 : curspace -= leafSizes[i];
1071 : : }
1072 : : else
1073 : : {
4326 bruce@momjian.us 1074 : 6698 : nodePageSelect[i] = 1; /* signifies new leaf page */
4502 tgl@sss.pgh.pa.us 1075 : 6698 : newspace -= leafSizes[i];
1076 : : }
1077 : : }
1078 [ + - + + ]: 2796 : if (curspace >= 0 && newspace >= 0)
1079 : : {
1080 : : /* Successful assignment, so we can include the new leaf tuple */
1081 [ + - ]: 2676 : if (includeNew)
1082 : : {
1083 : 2676 : nToInsert++;
1084 : 2676 : insertedNew = true;
1085 : : }
1086 : : }
1087 [ + - ]: 120 : else if (includeNew)
1088 : : {
1089 : : /* We must exclude the new leaf tuple from the split */
4326 bruce@momjian.us 1090 : 120 : int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1091 : :
4502 tgl@sss.pgh.pa.us 1092 : 120 : leafSizes[nodeOfNewTuple] -=
1093 : 120 : newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1094 : :
1095 : : /* Repeat the node assignment process --- should succeed now */
1096 : 120 : curspace = currentFreeSpace;
2916 kgrittn@postgresql.o 1097 : 120 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
4502 tgl@sss.pgh.pa.us 1098 [ + + ]: 582 : for (i = 0; i < out.nNodes; i++)
1099 : : {
1100 [ + + ]: 462 : if (leafSizes[i] <= curspace)
1101 : : {
2489 1102 : 152 : nodePageSelect[i] = 0; /* signifies current page */
4502 1103 : 152 : curspace -= leafSizes[i];
1104 : : }
1105 : : else
1106 : : {
2489 1107 : 310 : nodePageSelect[i] = 1; /* signifies new leaf page */
4502 1108 : 310 : newspace -= leafSizes[i];
1109 : : }
1110 : : }
1111 [ + - - + ]: 120 : if (curspace < 0 || newspace < 0)
4502 tgl@sss.pgh.pa.us 1112 [ # # ]:UBC 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1113 : : }
1114 : : else
1115 : : {
1116 : : /* oops, we already excluded new tuple ... should not get here */
1117 [ # # ]: 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1118 : : }
1119 : : /* Expand the per-node assignments to be shown per leaf tuple */
4502 tgl@sss.pgh.pa.us 1120 [ + + ]:CBC 363498 : for (i = 0; i < nToInsert; i++)
1121 : : {
1122 : 360702 : n = out.mapTuplesToNodes[i];
1123 : 360702 : leafPageSelect[i] = nodePageSelect[n];
1124 : : }
1125 : : }
1126 : :
1127 : : /* Start preparing WAL record */
1128 : 2830 : xlrec.nDelete = 0;
1129 : 2830 : xlrec.initSrc = isNew;
4417 1130 : 2830 : xlrec.storesNulls = isNulls;
3433 heikki.linnakangas@i 1131 [ + + - + ]: 2830 : xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1132 : :
4502 tgl@sss.pgh.pa.us 1133 : 2830 : leafdata = leafptr = (char *) palloc(totalLeafSizes);
1134 : :
1135 : : /* Here we begin making the changes to the target pages */
1136 : 2830 : START_CRIT_SECTION();
1137 : :
1138 : : /*
1139 : : * Delete old leaf tuples from current buffer, except when we're splitting
1140 : : * the root; in that case there's no need because we'll re-init the page
1141 : : * below. We do this first to make room for reinserting new leaf tuples.
1142 : : */
4417 1143 [ + + + - ]: 2830 : if (!SpGistBlockIsRoot(current->blkno))
1144 : : {
1145 : : /*
1146 : : * Init buffer instead of deleting individual tuples, but only if
1147 : : * there aren't any other live tuples and only during build; otherwise
1148 : : * we need to set a redirection tuple for concurrent scans.
1149 : : */
4502 1150 [ + + ]: 2789 : if (state->isBuild &&
1151 : 2117 : nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1152 [ + + ]: 2117 : PageGetMaxOffsetNumber(current->page))
1153 : : {
4417 1154 [ - + ]: 152 : SpGistInitBuffer(current->buffer,
1155 : : SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
4502 1156 : 152 : xlrec.initSrc = true;
1157 : : }
1158 [ + + ]: 2637 : else if (isNew)
1159 : : {
1160 : : /* don't expose the freshly init'd buffer as a backup block */
1161 [ - + ]: 22 : Assert(nToDelete == 0);
1162 : : }
1163 : : else
1164 : : {
1165 : 2615 : xlrec.nDelete = nToDelete;
1166 : :
1167 [ + + ]: 2615 : if (!state->isBuild)
1168 : : {
1169 : : /*
1170 : : * Need to create redirect tuple (it will point to new inner
1171 : : * tuple) but right now the new tuple's location is not known
1172 : : * yet. So, set the redirection pointer to "impossible" value
1173 : : * and remember its position to update tuple later.
1174 : : */
1175 [ + - ]: 650 : if (nToDelete > 0)
1176 : 650 : redirectTuplePos = toDelete[0];
1177 : 650 : spgPageIndexMultiDelete(state, current->page,
1178 : : toDelete, nToDelete,
1179 : : SPGIST_REDIRECT,
1180 : : SPGIST_PLACEHOLDER,
1181 : : SPGIST_METAPAGE_BLKNO,
1182 : : FirstOffsetNumber);
1183 : : }
1184 : : else
1185 : : {
1186 : : /*
1187 : : * During index build there is not concurrent searches, so we
1188 : : * don't need to create redirection tuple.
1189 : : */
1190 : 1965 : spgPageIndexMultiDelete(state, current->page,
1191 : : toDelete, nToDelete,
1192 : : SPGIST_PLACEHOLDER,
1193 : : SPGIST_PLACEHOLDER,
1194 : : InvalidBlockNumber,
1195 : : InvalidOffsetNumber);
1196 : : }
1197 : : }
1198 : : }
1199 : :
1200 : : /*
1201 : : * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1202 : : * nodes.
1203 : : */
1204 : 2830 : startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1205 [ + + ]: 365005 : for (i = 0; i < nToInsert; i++)
1206 : : {
1207 : 362175 : SpGistLeafTuple it = newLeafs[i];
1208 : : Buffer leafBuffer;
1209 : : BlockNumber leafBlock;
1210 : : OffsetNumber newoffset;
1211 : :
1212 : : /* Which page is it going to? */
1213 [ + + ]: 362175 : leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1214 : 362175 : leafBlock = BufferGetBlockNumber(leafBuffer);
1215 : :
1216 : : /* Link tuple into correct chain for its node */
1217 : 362175 : n = out.mapTuplesToNodes[i];
1218 : :
1219 [ + + ]: 362175 : if (ItemPointerIsValid(&nodes[n]->t_tid))
1220 : : {
1221 [ - + ]: 351314 : Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1105 1222 : 351314 : SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1223 : : }
1224 : : else
1225 : 10861 : SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1226 : :
1227 : : /* Insert it on page */
2916 kgrittn@postgresql.o 1228 : 362175 : newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
4502 tgl@sss.pgh.pa.us 1229 : 362175 : (Item) it, it->size,
1230 : 362175 : &startOffsets[leafPageSelect[i]],
1231 : : false);
1232 : 362175 : toInsert[i] = newoffset;
1233 : :
1234 : : /* ... and complete the chain linking */
1235 : 362175 : ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236 : :
1237 : : /* Also copy leaf tuple into WAL data */
1238 : 362175 : memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1239 : 362175 : leafptr += newLeafs[i]->size;
1240 : : }
1241 : :
1242 : : /*
1243 : : * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244 : : * current->buffer will be marked below, after we're entirely done
1245 : : * modifying it.
1246 : : */
1247 [ + + ]: 2830 : if (newLeafBuffer != InvalidBuffer)
1248 : : {
1249 : 2796 : MarkBufferDirty(newLeafBuffer);
1250 : : }
1251 : :
1252 : : /* Remember current buffer, since we're about to change "current" */
1253 : 2830 : saveCurrent = *current;
1254 : :
1255 : : /*
1256 : : * Store the new innerTuple
1257 : : */
1258 [ + + + + ]: 2830 : if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1259 : : {
1260 : : /*
1261 : : * new inner tuple goes to parent page
1262 : : */
1263 [ - + ]: 2402 : Assert(current->buffer != parent->buffer);
1264 : :
1265 : : /* Repoint "current" at the new inner tuple */
1266 : 2402 : current->blkno = parent->blkno;
1267 : 2402 : current->buffer = parent->buffer;
1268 : 2402 : current->page = parent->page;
1269 : 2402 : xlrec.offnumInner = current->offnum =
1270 : 2402 : SpGistPageAddNewItem(state, current->page,
1271 : 2402 : (Item) innerTuple, innerTuple->size,
1272 : : NULL, false);
1273 : :
1274 : : /*
1275 : : * Update parent node link and mark parent page dirty
1276 : : */
3433 heikki.linnakangas@i 1277 : 2402 : xlrec.innerIsParent = true;
4502 tgl@sss.pgh.pa.us 1278 : 2402 : xlrec.offnumParent = parent->offnum;
1279 : 2402 : xlrec.nodeI = parent->node;
1280 : 2402 : saveNodeLink(index, parent, current->blkno, current->offnum);
1281 : :
1282 : : /*
1283 : : * Update redirection link (in old current buffer)
1284 : : */
1285 [ + + ]: 2402 : if (redirectTuplePos != InvalidOffsetNumber)
1286 : 585 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1287 : 585 : current->blkno, current->offnum);
1288 : :
1289 : : /* Done modifying old current buffer, mark it dirty */
1290 : 2402 : MarkBufferDirty(saveCurrent.buffer);
1291 : : }
1292 [ + + ]: 428 : else if (parent->buffer != InvalidBuffer)
1293 : : {
1294 : : /*
1295 : : * new inner tuple will be stored on a new page
1296 : : */
1297 [ - + ]: 387 : Assert(newInnerBuffer != InvalidBuffer);
1298 : :
1299 : : /* Repoint "current" at the new inner tuple */
1300 : 387 : current->buffer = newInnerBuffer;
1301 : 387 : current->blkno = BufferGetBlockNumber(current->buffer);
2916 kgrittn@postgresql.o 1302 : 387 : current->page = BufferGetPage(current->buffer);
4502 tgl@sss.pgh.pa.us 1303 : 387 : xlrec.offnumInner = current->offnum =
1304 : 387 : SpGistPageAddNewItem(state, current->page,
1305 : 387 : (Item) innerTuple, innerTuple->size,
1306 : : NULL, false);
1307 : :
1308 : : /* Done modifying new current buffer, mark it dirty */
1309 : 387 : MarkBufferDirty(current->buffer);
1310 : :
1311 : : /*
1312 : : * Update parent node link and mark parent page dirty
1313 : : */
3433 heikki.linnakangas@i 1314 : 387 : xlrec.innerIsParent = (parent->buffer == current->buffer);
4502 tgl@sss.pgh.pa.us 1315 : 387 : xlrec.offnumParent = parent->offnum;
1316 : 387 : xlrec.nodeI = parent->node;
1317 : 387 : saveNodeLink(index, parent, current->blkno, current->offnum);
1318 : :
1319 : : /*
1320 : : * Update redirection link (in old current buffer)
1321 : : */
1322 [ + + ]: 387 : if (redirectTuplePos != InvalidOffsetNumber)
1323 : 65 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1324 : 65 : current->blkno, current->offnum);
1325 : :
1326 : : /* Done modifying old current buffer, mark it dirty */
1327 : 387 : MarkBufferDirty(saveCurrent.buffer);
1328 : : }
1329 : : else
1330 : : {
1331 : : /*
1332 : : * Splitting root page, which was a leaf but now becomes inner page
1333 : : * (and so "current" continues to point at it)
1334 : : */
4417 1335 [ - + - - ]: 41 : Assert(SpGistBlockIsRoot(current->blkno));
4502 1336 [ - + ]: 41 : Assert(redirectTuplePos == InvalidOffsetNumber);
1337 : :
4417 1338 [ - + ]: 41 : SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
4502 1339 : 41 : xlrec.initInner = true;
3433 heikki.linnakangas@i 1340 : 41 : xlrec.innerIsParent = false;
1341 : :
4502 tgl@sss.pgh.pa.us 1342 : 41 : xlrec.offnumInner = current->offnum =
1343 : 41 : PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1344 : : InvalidOffsetNumber, false, false);
1345 [ - + ]: 41 : if (current->offnum != FirstOffsetNumber)
4502 tgl@sss.pgh.pa.us 1346 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1347 : : innerTuple->size);
1348 : :
1349 : : /* No parent link to update, nor redirection to do */
4502 tgl@sss.pgh.pa.us 1350 :CBC 41 : xlrec.offnumParent = InvalidOffsetNumber;
1351 : 41 : xlrec.nodeI = 0;
1352 : :
1353 : : /* Done modifying new current buffer, mark it dirty */
1354 : 41 : MarkBufferDirty(current->buffer);
1355 : :
1356 : : /* saveCurrent doesn't represent a different buffer */
1357 : 41 : saveCurrent.buffer = InvalidBuffer;
1358 : : }
1359 : :
1838 heikki.linnakangas@i 1360 [ + + + + : 2830 : if (RelationNeedsWAL(index) && !state->isBuild)
+ + + + +
+ ]
1361 : : {
1362 : : XLogRecPtr recptr;
1363 : : int flags;
1364 : :
3433 1365 : 682 : XLogBeginInsert();
1366 : :
1367 : 682 : xlrec.nInsert = nToInsert;
1368 : 682 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1369 : :
1370 : 682 : XLogRegisterData((char *) toDelete,
1371 : 682 : sizeof(OffsetNumber) * xlrec.nDelete);
1372 : 682 : XLogRegisterData((char *) toInsert,
1373 : 682 : sizeof(OffsetNumber) * xlrec.nInsert);
1374 : 682 : XLogRegisterData((char *) leafPageSelect,
1375 : 682 : sizeof(uint8) * xlrec.nInsert);
1376 : 682 : XLogRegisterData((char *) innerTuple, innerTuple->size);
1377 : 682 : XLogRegisterData(leafdata, leafptr - leafdata);
1378 : :
1379 : : /* Old leaf page */
1380 [ + + ]: 682 : if (BufferIsValid(saveCurrent.buffer))
1381 : : {
3191 1382 : 672 : flags = REGBUF_STANDARD;
1383 [ + + ]: 672 : if (xlrec.initSrc)
1384 : 22 : flags |= REGBUF_WILL_INIT;
3433 1385 : 672 : XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1386 : : }
1387 : :
1388 : : /* New leaf page */
1389 [ + + ]: 682 : if (BufferIsValid(newLeafBuffer))
1390 : : {
1391 : 660 : flags = REGBUF_STANDARD;
1392 [ + + ]: 660 : if (xlrec.initDest)
1393 : 616 : flags |= REGBUF_WILL_INIT;
1394 : 660 : XLogRegisterBuffer(1, newLeafBuffer, flags);
1395 : : }
1396 : :
1397 : : /* Inner page */
3191 1398 : 682 : flags = REGBUF_STANDARD;
1399 [ + + ]: 682 : if (xlrec.initInner)
1400 : 20 : flags |= REGBUF_WILL_INIT;
1401 : 682 : XLogRegisterBuffer(2, current->buffer, flags);
1402 : :
1403 : : /* Parent page, if different from inner page */
3433 1404 [ + + ]: 682 : if (parent->buffer != InvalidBuffer)
1405 : : {
1406 [ + + ]: 672 : if (parent->buffer != current->buffer)
1407 : 65 : XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1408 : : else
1409 [ - + ]: 607 : Assert(xlrec.innerIsParent);
1410 : : }
1411 : :
1412 : : /* Issue the WAL record */
1413 : 682 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1414 : :
1415 : : /* Update page LSNs on all affected pages */
4502 tgl@sss.pgh.pa.us 1416 [ + + ]: 682 : if (newLeafBuffer != InvalidBuffer)
1417 : : {
2916 kgrittn@postgresql.o 1418 : 660 : Page page = BufferGetPage(newLeafBuffer);
1419 : :
4502 tgl@sss.pgh.pa.us 1420 : 660 : PageSetLSN(page, recptr);
1421 : : }
1422 : :
1423 [ + + ]: 682 : if (saveCurrent.buffer != InvalidBuffer)
1424 : : {
2916 kgrittn@postgresql.o 1425 : 672 : Page page = BufferGetPage(saveCurrent.buffer);
1426 : :
4502 tgl@sss.pgh.pa.us 1427 : 672 : PageSetLSN(page, recptr);
1428 : : }
1429 : :
1430 : 682 : PageSetLSN(current->page, recptr);
1431 : :
1432 [ + + ]: 682 : if (parent->buffer != InvalidBuffer)
1433 : : {
1434 : 672 : PageSetLSN(parent->page, recptr);
1435 : : }
1436 : : }
1437 : :
1438 [ - + ]: 2830 : END_CRIT_SECTION();
1439 : :
1440 : : /* Update local free-space cache and unlock buffers */
1441 [ + + ]: 2830 : if (newLeafBuffer != InvalidBuffer)
1442 : : {
1443 : 2796 : SpGistSetLastUsedPage(index, newLeafBuffer);
1444 : 2796 : UnlockReleaseBuffer(newLeafBuffer);
1445 : : }
1446 [ + + ]: 2830 : if (saveCurrent.buffer != InvalidBuffer)
1447 : : {
1448 : 2789 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1449 : 2789 : UnlockReleaseBuffer(saveCurrent.buffer);
1450 : : }
1451 : :
1452 : 2830 : return insertedNew;
1453 : : }
1454 : :
1455 : : /*
1456 : : * spgMatchNode action: descend to N'th child node of current inner tuple
1457 : : */
1458 : : static void
1459 : 9328411 : spgMatchNodeAction(Relation index, SpGistState *state,
1460 : : SpGistInnerTuple innerTuple,
1461 : : SPPageDesc *current, SPPageDesc *parent, int nodeN)
1462 : : {
1463 : : int i;
1464 : : SpGistNodeTuple node;
1465 : :
1466 : : /* Release previous parent buffer if any */
1467 [ + + ]: 9328411 : if (parent->buffer != InvalidBuffer &&
1468 [ + + ]: 8934219 : parent->buffer != current->buffer)
1469 : : {
1470 : 391889 : SpGistSetLastUsedPage(index, parent->buffer);
1471 : 391889 : UnlockReleaseBuffer(parent->buffer);
1472 : : }
1473 : :
1474 : : /* Repoint parent to specified node of current inner tuple */
1475 : 9328411 : parent->blkno = current->blkno;
1476 : 9328411 : parent->buffer = current->buffer;
1477 : 9328411 : parent->page = current->page;
1478 : 9328411 : parent->offnum = current->offnum;
1479 : 9328411 : parent->node = nodeN;
1480 : :
1481 : : /* Locate that node */
1482 [ + - ]: 15627517 : SGITITERATE(innerTuple, i, node)
1483 : : {
1484 [ + + ]: 15627517 : if (i == nodeN)
1485 : 9328411 : break;
1486 : : }
1487 : :
1488 [ - + ]: 9328411 : if (i != nodeN)
4502 tgl@sss.pgh.pa.us 1489 [ # # ]:UBC 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1490 : : nodeN);
1491 : :
1492 : : /* Point current to the downlink location, if any */
4502 tgl@sss.pgh.pa.us 1493 [ + + ]:CBC 9328411 : if (ItemPointerIsValid(&node->t_tid))
1494 : : {
1495 : 9327369 : current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1496 : 9327369 : current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1497 : : }
1498 : : else
1499 : : {
1500 : : /* Downlink is empty, so we'll need to find a new page */
1501 : 1042 : current->blkno = InvalidBlockNumber;
1502 : 1042 : current->offnum = InvalidOffsetNumber;
1503 : : }
1504 : :
1505 : 9328411 : current->buffer = InvalidBuffer;
1506 : 9328411 : current->page = NULL;
1507 : 9328411 : }
1508 : :
1509 : : /*
1510 : : * spgAddNode action: add a node to the inner tuple at current
1511 : : */
1512 : : static void
1513 : 789 : spgAddNodeAction(Relation index, SpGistState *state,
1514 : : SpGistInnerTuple innerTuple,
1515 : : SPPageDesc *current, SPPageDesc *parent,
1516 : : int nodeN, Datum nodeLabel)
1517 : : {
1518 : : SpGistInnerTuple newInnerTuple;
1519 : : spgxlogAddNode xlrec;
1520 : :
1521 : : /* Should not be applied to nulls */
4417 1522 [ - + ]: 789 : Assert(!SpGistPageStoresNulls(current->page));
1523 : :
1524 : : /* Construct new inner tuple with additional node */
4502 1525 : 789 : newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1526 : :
1527 : : /* Prepare WAL record */
1528 : 789 : STORE_STATE(state, xlrec.stateSrc);
1529 : 789 : xlrec.offnum = current->offnum;
1530 : :
1531 : : /* we don't fill these unless we need to change the parent downlink */
3433 heikki.linnakangas@i 1532 : 789 : xlrec.parentBlk = -1;
4502 tgl@sss.pgh.pa.us 1533 : 789 : xlrec.offnumParent = InvalidOffsetNumber;
1534 : 789 : xlrec.nodeI = 0;
1535 : :
1536 : : /* we don't fill these unless tuple has to be moved */
1537 : 789 : xlrec.offnumNew = InvalidOffsetNumber;
1538 : 789 : xlrec.newPage = false;
1539 : :
1540 : 789 : if (PageGetExactFreeSpace(current->page) >=
1541 [ + + ]: 789 : newInnerTuple->size - innerTuple->size)
1542 : : {
1543 : : /*
1544 : : * We can replace the inner tuple by new version in-place
1545 : : */
1546 : 786 : START_CRIT_SECTION();
1547 : :
1548 : 786 : PageIndexTupleDelete(current->page, current->offnum);
1549 : 786 : if (PageAddItem(current->page,
1550 : : (Item) newInnerTuple, newInnerTuple->size,
1551 [ - + ]: 786 : current->offnum, false, false) != current->offnum)
4502 tgl@sss.pgh.pa.us 1552 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1553 : : newInnerTuple->size);
1554 : :
4502 tgl@sss.pgh.pa.us 1555 :CBC 786 : MarkBufferDirty(current->buffer);
1556 : :
1838 heikki.linnakangas@i 1557 [ + - + + : 786 : if (RelationNeedsWAL(index) && !state->isBuild)
+ + + - +
+ ]
1558 : : {
1559 : : XLogRecPtr recptr;
1560 : :
3433 1561 : 353 : XLogBeginInsert();
1562 : 353 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1563 : 353 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1564 : :
1565 : 353 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1566 : :
1567 : 353 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1568 : :
4502 tgl@sss.pgh.pa.us 1569 : 353 : PageSetLSN(current->page, recptr);
1570 : : }
1571 : :
1572 [ - + ]: 786 : END_CRIT_SECTION();
1573 : : }
1574 : : else
1575 : : {
1576 : : /*
1577 : : * move inner tuple to another page, and update parent
1578 : : */
1579 : : SpGistDeadTuple dt;
1580 : : SPPageDesc saveCurrent;
1581 : :
1582 : : /*
1583 : : * It should not be possible to get here for the root page, since we
1584 : : * allow only one inner tuple on the root page, and spgFormInnerTuple
1585 : : * always checks that inner tuples don't exceed the size of a page.
1586 : : */
4417 1587 [ + - - + ]: 3 : if (SpGistBlockIsRoot(current->blkno))
4502 tgl@sss.pgh.pa.us 1588 [ # # ]:UBC 0 : elog(ERROR, "cannot enlarge root tuple any more");
4502 tgl@sss.pgh.pa.us 1589 [ - + ]:CBC 3 : Assert(parent->buffer != InvalidBuffer);
1590 : :
1591 : 3 : saveCurrent = *current;
1592 : :
1593 : 3 : xlrec.offnumParent = parent->offnum;
1594 : 3 : xlrec.nodeI = parent->node;
1595 : :
1596 : : /*
1597 : : * obtain new buffer with the same parity as current, since it will be
1598 : : * a child of same parent tuple
1599 : : */
1600 : 6 : current->buffer = SpGistGetBuffer(index,
1601 : 3 : GBUF_INNER_PARITY(current->blkno),
2489 1602 : 3 : newInnerTuple->size + sizeof(ItemIdData),
1603 : : &xlrec.newPage);
4502 1604 : 3 : current->blkno = BufferGetBlockNumber(current->buffer);
2916 kgrittn@postgresql.o 1605 : 3 : current->page = BufferGetPage(current->buffer);
1606 : :
1607 : : /*
1608 : : * Let's just make real sure new current isn't same as old. Right now
1609 : : * that's impossible, but if SpGistGetBuffer ever got smart enough to
1610 : : * delete placeholder tuples before checking space, maybe it wouldn't
1611 : : * be impossible. The case would appear to work except that WAL
1612 : : * replay would be subtly wrong, so I think a mere assert isn't enough
1613 : : * here.
1614 : : */
3433 heikki.linnakangas@i 1615 [ - + ]: 3 : if (current->blkno == saveCurrent.blkno)
4326 bruce@momjian.us 1616 [ # # ]:UBC 0 : elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1617 : :
1618 : : /*
1619 : : * New current and parent buffer will both be modified; but note that
1620 : : * parent buffer could be same as either new or old current.
1621 : : */
3433 heikki.linnakangas@i 1622 [ - + ]:CBC 3 : if (parent->buffer == saveCurrent.buffer)
3433 heikki.linnakangas@i 1623 :UBC 0 : xlrec.parentBlk = 0;
3433 heikki.linnakangas@i 1624 [ - + ]:CBC 3 : else if (parent->buffer == current->buffer)
3433 heikki.linnakangas@i 1625 :UBC 0 : xlrec.parentBlk = 1;
1626 : : else
3433 heikki.linnakangas@i 1627 :CBC 3 : xlrec.parentBlk = 2;
1628 : :
4502 tgl@sss.pgh.pa.us 1629 : 3 : START_CRIT_SECTION();
1630 : :
1631 : : /* insert new ... */
1632 : 3 : xlrec.offnumNew = current->offnum =
1633 : 3 : SpGistPageAddNewItem(state, current->page,
1634 : 3 : (Item) newInnerTuple, newInnerTuple->size,
1635 : : NULL, false);
1636 : :
1637 : 3 : MarkBufferDirty(current->buffer);
1638 : :
1639 : : /* update parent's downlink and mark parent page dirty */
1640 : 3 : saveNodeLink(index, parent, current->blkno, current->offnum);
1641 : :
1642 : : /*
1643 : : * Replace old tuple with a placeholder or redirection tuple. Unless
1644 : : * doing an index build, we have to insert a redirection tuple for
1645 : : * possible concurrent scans. We can't just delete it in any case,
1646 : : * because that could change the offsets of other tuples on the page,
1647 : : * breaking downlinks from their parents.
1648 : : */
1649 [ - + ]: 3 : if (state->isBuild)
4502 tgl@sss.pgh.pa.us 1650 :UBC 0 : dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1651 : : InvalidBlockNumber, InvalidOffsetNumber);
1652 : : else
4502 tgl@sss.pgh.pa.us 1653 :CBC 3 : dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1654 : 3 : current->blkno, current->offnum);
1655 : :
1656 : 3 : PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1657 : 3 : if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1658 : : saveCurrent.offnum,
1659 [ - + ]: 3 : false, false) != saveCurrent.offnum)
4502 tgl@sss.pgh.pa.us 1660 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1661 : : dt->size);
1662 : :
4502 tgl@sss.pgh.pa.us 1663 [ - + ]:CBC 3 : if (state->isBuild)
4502 tgl@sss.pgh.pa.us 1664 :UBC 0 : SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1665 : : else
4502 tgl@sss.pgh.pa.us 1666 :CBC 3 : SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1667 : :
1668 : 3 : MarkBufferDirty(saveCurrent.buffer);
1669 : :
1838 heikki.linnakangas@i 1670 [ + - + + : 3 : if (RelationNeedsWAL(index) && !state->isBuild)
+ - + - +
- ]
1671 : : {
1672 : : XLogRecPtr recptr;
1673 : : int flags;
1674 : :
3433 1675 : 3 : XLogBeginInsert();
1676 : :
1677 : : /* orig page */
1678 : 3 : XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1679 : : /* new page */
3191 1680 : 3 : flags = REGBUF_STANDARD;
1681 [ + - ]: 3 : if (xlrec.newPage)
1682 : 3 : flags |= REGBUF_WILL_INIT;
1683 : 3 : XLogRegisterBuffer(1, current->buffer, flags);
1684 : : /* parent page (if different from orig and new) */
3433 1685 [ + - ]: 3 : if (xlrec.parentBlk == 2)
1686 : 3 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1687 : :
1688 : 3 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1689 : 3 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1690 : :
1691 : 3 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1692 : :
1693 : : /* we don't bother to check if any of these are redundant */
4502 tgl@sss.pgh.pa.us 1694 : 3 : PageSetLSN(current->page, recptr);
1695 : 3 : PageSetLSN(parent->page, recptr);
1696 : 3 : PageSetLSN(saveCurrent.page, recptr);
1697 : : }
1698 : :
1699 [ - + ]: 3 : END_CRIT_SECTION();
1700 : :
1701 : : /* Release saveCurrent if it's not same as current or parent */
1702 [ + - ]: 3 : if (saveCurrent.buffer != current->buffer &&
1703 [ + - ]: 3 : saveCurrent.buffer != parent->buffer)
1704 : : {
1705 : 3 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1706 : 3 : UnlockReleaseBuffer(saveCurrent.buffer);
1707 : : }
1708 : : }
1709 : 789 : }
1710 : :
1711 : : /*
1712 : : * spgSplitNode action: split inner tuple at current into prefix and postfix
1713 : : */
1714 : : static void
1715 : 323 : spgSplitNodeAction(Relation index, SpGistState *state,
1716 : : SpGistInnerTuple innerTuple,
1717 : : SPPageDesc *current, spgChooseOut *out)
1718 : : {
1719 : : SpGistInnerTuple prefixTuple,
1720 : : postfixTuple;
1721 : : SpGistNodeTuple node,
1722 : : *nodes;
1723 : : BlockNumber postfixBlkno;
1724 : : OffsetNumber postfixOffset;
1725 : : int i;
1726 : : spgxlogSplitTuple xlrec;
1727 : 323 : Buffer newBuffer = InvalidBuffer;
1728 : :
1729 : : /* Should not be applied to nulls */
4417 1730 [ - + ]: 323 : Assert(!SpGistPageStoresNulls(current->page));
1731 : :
1732 : : /* Check opclass gave us sane values */
2791 1733 [ + - ]: 323 : if (out->result.splitTuple.prefixNNodes <= 0 ||
1734 [ - + ]: 323 : out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
2791 tgl@sss.pgh.pa.us 1735 [ # # ]:UBC 0 : elog(ERROR, "invalid number of prefix nodes: %d",
1736 : : out->result.splitTuple.prefixNNodes);
2791 tgl@sss.pgh.pa.us 1737 [ + - ]:CBC 323 : if (out->result.splitTuple.childNodeN < 0 ||
1738 : 323 : out->result.splitTuple.childNodeN >=
1739 [ - + ]: 323 : out->result.splitTuple.prefixNNodes)
2791 tgl@sss.pgh.pa.us 1740 [ # # ]:UBC 0 : elog(ERROR, "invalid child node number: %d",
1741 : : out->result.splitTuple.childNodeN);
1742 : :
1743 : : /*
1744 : : * Construct new prefix tuple with requested number of nodes. We'll fill
1745 : : * in the childNodeN'th node's downlink below.
1746 : : */
2791 tgl@sss.pgh.pa.us 1747 :CBC 323 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1748 : 323 : out->result.splitTuple.prefixNNodes);
1749 : :
1750 [ + + ]: 646 : for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1751 : : {
1752 : 323 : Datum label = (Datum) 0;
1753 : : bool labelisnull;
1754 : :
1755 : 323 : labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1756 [ + - ]: 323 : if (!labelisnull)
1757 : 323 : label = out->result.splitTuple.prefixNodeLabels[i];
1758 : 323 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1759 : : }
1760 : :
4502 1761 : 323 : prefixTuple = spgFormInnerTuple(state,
1762 : 323 : out->result.splitTuple.prefixHasPrefix,
1763 : : out->result.splitTuple.prefixPrefixDatum,
1764 : : out->result.splitTuple.prefixNNodes,
1765 : : nodes);
1766 : :
1767 : : /* it must fit in the space that innerTuple now occupies */
1768 [ - + ]: 323 : if (prefixTuple->size > innerTuple->size)
4502 tgl@sss.pgh.pa.us 1769 [ # # ]:UBC 0 : elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1770 : :
1771 : : /*
1772 : : * Construct new postfix tuple, containing all nodes of innerTuple with
1773 : : * same node datums, but with the prefix specified by the picksplit
1774 : : * function.
1775 : : */
4502 tgl@sss.pgh.pa.us 1776 :CBC 323 : nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1777 [ + + ]: 1143 : SGITITERATE(innerTuple, i, node)
1778 : : {
1779 : 820 : nodes[i] = node;
1780 : : }
1781 : :
1782 : 323 : postfixTuple = spgFormInnerTuple(state,
1783 : 323 : out->result.splitTuple.postfixHasPrefix,
1784 : : out->result.splitTuple.postfixPrefixDatum,
1785 : 323 : innerTuple->nNodes, nodes);
1786 : :
1787 : : /* Postfix tuple is allTheSame if original tuple was */
1788 : 323 : postfixTuple->allTheSame = innerTuple->allTheSame;
1789 : :
1790 : : /* prep data for WAL record */
1791 : 323 : xlrec.newPage = false;
1792 : :
1793 : : /*
1794 : : * If we can't fit both tuples on the current page, get a new page for the
1795 : : * postfix tuple. In particular, can't split to the root page.
1796 : : *
1797 : : * For the space calculation, note that prefixTuple replaces innerTuple
1798 : : * but postfixTuple will be a new entry.
1799 : : */
4417 1800 [ + + + - ]: 323 : if (SpGistBlockIsRoot(current->blkno) ||
4502 1801 [ + - ]: 317 : SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1802 [ + + ]: 317 : prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1803 : : {
1804 : : /*
1805 : : * Choose page with next triple parity, because postfix tuple is a
1806 : : * child of prefix one
1807 : : */
1808 : 84 : newBuffer = SpGistGetBuffer(index,
1809 : 84 : GBUF_INNER_PARITY(current->blkno + 1),
1810 : 84 : postfixTuple->size + sizeof(ItemIdData),
1811 : : &xlrec.newPage);
1812 : : }
1813 : :
1814 : 323 : START_CRIT_SECTION();
1815 : :
1816 : : /*
1817 : : * Replace old tuple by prefix tuple
1818 : : */
1819 : 323 : PageIndexTupleDelete(current->page, current->offnum);
1820 : 323 : xlrec.offnumPrefix = PageAddItem(current->page,
1821 : : (Item) prefixTuple, prefixTuple->size,
1822 : : current->offnum, false, false);
1823 [ - + ]: 323 : if (xlrec.offnumPrefix != current->offnum)
4502 tgl@sss.pgh.pa.us 1824 [ # # ]:UBC 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1825 : : prefixTuple->size);
1826 : :
1827 : : /*
1828 : : * put postfix tuple into appropriate page
1829 : : */
4502 tgl@sss.pgh.pa.us 1830 [ + + ]:CBC 323 : if (newBuffer == InvalidBuffer)
1831 : : {
3433 heikki.linnakangas@i 1832 : 239 : postfixBlkno = current->blkno;
4502 tgl@sss.pgh.pa.us 1833 : 239 : xlrec.offnumPostfix = postfixOffset =
1834 : 239 : SpGistPageAddNewItem(state, current->page,
1835 : 239 : (Item) postfixTuple, postfixTuple->size,
1836 : : NULL, false);
3433 heikki.linnakangas@i 1837 : 239 : xlrec.postfixBlkSame = true;
1838 : : }
1839 : : else
1840 : : {
1841 : 84 : postfixBlkno = BufferGetBlockNumber(newBuffer);
4502 tgl@sss.pgh.pa.us 1842 : 84 : xlrec.offnumPostfix = postfixOffset =
2916 kgrittn@postgresql.o 1843 : 84 : SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
4502 tgl@sss.pgh.pa.us 1844 : 84 : (Item) postfixTuple, postfixTuple->size,
1845 : : NULL, false);
1846 : 84 : MarkBufferDirty(newBuffer);
3433 heikki.linnakangas@i 1847 : 84 : xlrec.postfixBlkSame = false;
1848 : : }
1849 : :
1850 : : /*
1851 : : * And set downlink pointer in the prefix tuple to point to postfix tuple.
1852 : : * (We can't avoid this step by doing the above two steps in opposite
1853 : : * order, because there might not be enough space on the page to insert
1854 : : * the postfix tuple first.) We have to update the local copy of the
1855 : : * prefixTuple too, because that's what will be written to WAL.
1856 : : */
2791 tgl@sss.pgh.pa.us 1857 : 323 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1858 : : postfixBlkno, postfixOffset);
4502 1859 : 323 : prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
2489 1860 : 323 : PageGetItemId(current->page, current->offnum));
2791 1861 : 323 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1862 : : postfixBlkno, postfixOffset);
1863 : :
4502 1864 : 323 : MarkBufferDirty(current->buffer);
1865 : :
1838 heikki.linnakangas@i 1866 [ + - + + : 323 : if (RelationNeedsWAL(index) && !state->isBuild)
+ + + - +
+ ]
1867 : : {
1868 : : XLogRecPtr recptr;
1869 : :
3433 1870 : 306 : XLogBeginInsert();
1871 : 306 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1872 : 306 : XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1873 : 306 : XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1874 : :
1875 : 306 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1876 [ + + ]: 306 : if (newBuffer != InvalidBuffer)
1877 : : {
1878 : : int flags;
1879 : :
1880 : 81 : flags = REGBUF_STANDARD;
1881 [ + + ]: 81 : if (xlrec.newPage)
1882 : 3 : flags |= REGBUF_WILL_INIT;
1883 : 81 : XLogRegisterBuffer(1, newBuffer, flags);
1884 : : }
1885 : :
1886 : 306 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1887 : :
4502 tgl@sss.pgh.pa.us 1888 : 306 : PageSetLSN(current->page, recptr);
1889 : :
1890 [ + + ]: 306 : if (newBuffer != InvalidBuffer)
1891 : : {
2916 kgrittn@postgresql.o 1892 : 81 : PageSetLSN(BufferGetPage(newBuffer), recptr);
1893 : : }
1894 : : }
1895 : :
4502 tgl@sss.pgh.pa.us 1896 [ - + ]: 323 : END_CRIT_SECTION();
1897 : :
1898 : : /* Update local free-space cache and release buffer */
1899 [ + + ]: 323 : if (newBuffer != InvalidBuffer)
1900 : : {
1901 : 84 : SpGistSetLastUsedPage(index, newBuffer);
1902 : 84 : UnlockReleaseBuffer(newBuffer);
1903 : : }
1904 : 323 : }
1905 : :
1906 : : /*
1907 : : * Insert one item into the index.
1908 : : *
1909 : : * Returns true on success, false if we failed to complete the insertion
1910 : : * (typically because of conflict with a concurrent insert). In the latter
1911 : : * case, caller should re-call spgdoinsert() with the same args.
1912 : : */
1913 : : bool
1914 : 402681 : spgdoinsert(Relation index, SpGistState *state,
1915 : : ItemPointer heapPtr, Datum *datums, bool *isnulls)
1916 : : {
1066 1917 : 402681 : bool result = true;
1105 1918 : 402681 : TupleDesc leafDescriptor = state->leafTupDesc;
1919 : 402681 : bool isnull = isnulls[spgKeyColumn];
4502 1920 : 402681 : int level = 0;
1921 : : Datum leafDatums[INDEX_MAX_KEYS];
1922 : : int leafSize;
1923 : : int bestLeafSize;
1066 1924 : 402681 : int numNoProgressCycles = 0;
1925 : : SPPageDesc current,
1926 : : parent;
4246 heikki.linnakangas@i 1927 : 402681 : FmgrInfo *procinfo = NULL;
1928 : :
1929 : : /*
1930 : : * Look up FmgrInfo of the user-defined choose function once, to save
1931 : : * cycles in the loop below.
1932 : : */
1933 [ + + ]: 402681 : if (!isnull)
1934 : 402636 : procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1935 : :
1936 : : /*
1937 : : * Prepare the leaf datum to insert.
1938 : : *
1939 : : * If an optional "compress" method is provided, then call it to form the
1940 : : * leaf key datum from the input datum. Otherwise, store the input datum
1941 : : * as is. Since we don't use index_form_tuple in this AM, we have to make
1942 : : * sure value to be inserted is not toasted; FormIndexDatum doesn't
1943 : : * guarantee that. But we assume the "compress" method to return an
1944 : : * untoasted value.
1945 : : */
2305 teodor@sigaev.ru 1946 [ + + ]: 402681 : if (!isnull)
1947 : : {
1948 [ + + ]: 402636 : if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1949 : : {
1950 : 39674 : FmgrInfo *compressProcinfo = NULL;
1951 : :
1952 : 39674 : compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1105 tgl@sss.pgh.pa.us 1953 : 39674 : leafDatums[spgKeyColumn] =
1954 : 39674 : FunctionCall1Coll(compressProcinfo,
1955 : 39674 : index->rd_indcollation[spgKeyColumn],
1956 : : datums[spgKeyColumn]);
1957 : : }
1958 : : else
1959 : : {
2305 teodor@sigaev.ru 1960 [ - + ]: 362962 : Assert(state->attLeafType.type == state->attType.type);
1961 : :
1962 [ + + ]: 362962 : if (state->attType.attlen == -1)
1105 tgl@sss.pgh.pa.us 1963 : 89736 : leafDatums[spgKeyColumn] =
1964 : 89736 : PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1965 : : else
1966 : 273226 : leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1967 : : }
1968 : : }
1969 : : else
1970 : 45 : leafDatums[spgKeyColumn] = (Datum) 0;
1971 : :
1972 : : /* Likewise, ensure that any INCLUDE values are not toasted */
1973 [ + + ]: 449038 : for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1974 : : {
1975 [ + + ]: 46357 : if (!isnulls[i])
1976 : : {
1977 [ + + ]: 42896 : if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1978 : 6674 : leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1979 : : else
1980 : 36222 : leafDatums[i] = datums[i];
1981 : : }
1982 : : else
1983 : 3461 : leafDatums[i] = (Datum) 0;
1984 : : }
1985 : :
1986 : : /*
1987 : : * Compute space needed for a leaf tuple containing the given data.
1988 : : */
1989 : 402681 : leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1990 : : /* Account for an item pointer, too */
1109 1991 : 402681 : leafSize += sizeof(ItemIdData);
1992 : :
1993 : : /*
1994 : : * If it isn't gonna fit, and the opclass can't reduce the datum size by
1995 : : * suffixing, bail out now rather than doing a lot of useless work.
1996 : : */
1066 1997 [ + + + - ]: 402681 : if (leafSize > SPGIST_PAGE_CAPACITY &&
1998 [ - + ]: 2 : (isnull || !state->config.longValuesOK))
4502 tgl@sss.pgh.pa.us 1999 [ # # ]:UBC 0 : ereport(ERROR,
2000 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2001 : : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2002 : : leafSize - sizeof(ItemIdData),
2003 : : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2004 : : RelationGetRelationName(index)),
2005 : : errhint("Values larger than a buffer page cannot be indexed.")));
1066 tgl@sss.pgh.pa.us 2006 :CBC 402681 : bestLeafSize = leafSize;
2007 : :
2008 : : /* Initialize "current" to the appropriate root page */
4417 2009 [ + + ]: 402681 : current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
4502 2010 : 402681 : current.buffer = InvalidBuffer;
2011 : 402681 : current.page = NULL;
2012 : 402681 : current.offnum = FirstOffsetNumber;
2013 : 402681 : current.node = -1;
2014 : :
2015 : : /* "parent" is invalid for the moment */
2016 : 402681 : parent.blkno = InvalidBlockNumber;
2017 : 402681 : parent.buffer = InvalidBuffer;
2018 : 402681 : parent.page = NULL;
2019 : 402681 : parent.offnum = InvalidOffsetNumber;
2020 : 402681 : parent.node = -1;
2021 : :
2022 : : /*
2023 : : * Before entering the loop, try to clear any pending interrupt condition.
2024 : : * If a query cancel is pending, we might as well accept it now not later;
2025 : : * while if a non-canceling condition is pending, servicing it here avoids
2026 : : * having to restart the insertion and redo all the work so far.
2027 : : */
1066 2028 [ - + ]: 402681 : CHECK_FOR_INTERRUPTS();
2029 : :
2030 : : for (;;)
4502 2031 : 9328409 : {
2032 : 9731090 : bool isNew = false;
2033 : :
2034 : : /*
2035 : : * Bail out if query cancel is pending. We must have this somewhere
2036 : : * in the loop since a broken opclass could produce an infinite
2037 : : * picksplit loop. However, because we'll be holding buffer lock(s)
2038 : : * after the first iteration, ProcessInterrupts() wouldn't be able to
2039 : : * throw a cancel error here. Hence, if we see that an interrupt is
2040 : : * pending, break out of the loop and deal with the situation below.
2041 : : * Set result = false because we must restart the insertion if the
2042 : : * interrupt isn't a query-cancel-or-die case.
2043 : : */
1066 2044 [ - + ]: 9731090 : if (INTERRUPTS_PENDING_CONDITION())
2045 : : {
1066 tgl@sss.pgh.pa.us 2046 :UBC 0 : result = false;
2047 : 0 : break;
2048 : : }
2049 : :
4502 tgl@sss.pgh.pa.us 2050 [ + + ]:CBC 9731090 : if (current.blkno == InvalidBlockNumber)
2051 : : {
2052 : : /*
2053 : : * Create a leaf page. If leafSize is too large to fit on a page,
2054 : : * we won't actually use the page yet, but it simplifies the API
2055 : : * for doPickSplit to always have a leaf page at hand; so just
2056 : : * quietly limit our request to a page size.
2057 : : */
4417 2058 : 1040 : current.buffer =
2059 [ - + ]: 1040 : SpGistGetBuffer(index,
2060 : : GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2061 [ + + ]: 1040 : Min(leafSize, SPGIST_PAGE_CAPACITY),
2062 : : &isNew);
4502 2063 : 1040 : current.blkno = BufferGetBlockNumber(current.buffer);
2064 : : }
3957 2065 [ + + ]: 9730050 : else if (parent.buffer == InvalidBuffer)
2066 : : {
2067 : : /* we hold no parent-page lock, so no deadlock is possible */
4502 2068 : 402681 : current.buffer = ReadBuffer(index, current.blkno);
2069 : 402681 : LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2070 : : }
3957 2071 [ + + ]: 9327369 : else if (current.blkno != parent.blkno)
2072 : : {
2073 : : /* descend to a new child page */
2074 : 785141 : current.buffer = ReadBuffer(index, current.blkno);
2075 : :
2076 : : /*
2077 : : * Attempt to acquire lock on child page. We must beware of
2078 : : * deadlock against another insertion process descending from that
2079 : : * page to our parent page (see README). If we fail to get lock,
2080 : : * abandon the insertion and tell our caller to start over.
2081 : : *
2082 : : * XXX this could be improved, because failing to get lock on a
2083 : : * buffer is not proof of a deadlock situation; the lock might be
2084 : : * held by a reader, or even just background writer/checkpointer
2085 : : * process. Perhaps it'd be worth retrying after sleeping a bit?
2086 : : */
2087 [ - + ]: 785141 : if (!ConditionalLockBuffer(current.buffer))
2088 : : {
3957 tgl@sss.pgh.pa.us 2089 :UBC 0 : ReleaseBuffer(current.buffer);
2090 : 0 : UnlockReleaseBuffer(parent.buffer);
2091 : 0 : return false;
2092 : : }
2093 : : }
2094 : : else
2095 : : {
2096 : : /* inner tuple can be stored on the same page as parent one */
4502 tgl@sss.pgh.pa.us 2097 :CBC 8542228 : current.buffer = parent.buffer;
2098 : : }
2916 kgrittn@postgresql.o 2099 : 9731090 : current.page = BufferGetPage(current.buffer);
2100 : :
2101 : : /* should not arrive at a page of the wrong type */
4417 tgl@sss.pgh.pa.us 2102 [ + + - + ]: 19462135 : if (isnull ? !SpGistPageStoresNulls(current.page) :
2103 : 9731045 : SpGistPageStoresNulls(current.page))
4417 tgl@sss.pgh.pa.us 2104 [ # # ]:UBC 0 : elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2105 : : current.blkno);
2106 : :
4502 tgl@sss.pgh.pa.us 2107 [ + + ]:CBC 9731090 : if (SpGistPageIsLeaf(current.page))
2108 : : {
2109 : : SpGistLeafTuple leafTuple;
2110 : : int nToSplit,
2111 : : sizeToSplit;
2112 : :
1105 2113 : 402821 : leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
4502 2114 : 805642 : if (leafTuple->size + sizeof(ItemIdData) <=
2115 [ + + + + ]: 402821 : SpGistPageGetFreeSpace(current.page, 1))
2116 : : {
2117 : : /* it fits on page, so insert it and we're done */
2118 : 398845 : addLeafTuple(index, state, leafTuple,
2119 : : ¤t, &parent, isnull, isNew);
2120 : 402679 : break;
2121 : : }
2122 [ + + ]: 3976 : else if ((sizeToSplit =
2123 : 3976 : checkSplitConditions(index, state, ¤t,
2489 2124 : 1833 : &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
4502 2125 [ + + ]: 1833 : nToSplit < 64 &&
2126 [ + + ]: 1170 : leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2127 : : {
2128 : : /*
2129 : : * the amount of data is pretty small, so just move the whole
2130 : : * chain to another leaf page rather than splitting it.
2131 : : */
2132 [ - + ]: 1146 : Assert(!isNew);
4417 2133 : 1146 : moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
4502 2134 : 1146 : break; /* we're done */
2135 : : }
2136 : : else
2137 : : {
2138 : : /* picksplit */
2139 [ + + ]: 2830 : if (doPickSplit(index, state, ¤t, &parent,
2140 : : leafTuple, level, isnull, isNew))
2141 : 2688 : break; /* doPickSplit installed new tuples */
2142 : :
2143 : : /* leaf tuple will not be inserted yet */
2144 : 142 : pfree(leafTuple);
2145 : :
2146 : : /*
2147 : : * current now describes new inner tuple, go insert into it
2148 : : */
2149 [ - + ]: 142 : Assert(!SpGistPageIsLeaf(current.page));
2150 : 142 : goto process_inner_tuple;
2151 : : }
2152 : : }
2153 : : else /* non-leaf page */
2154 : : {
2155 : : /*
2156 : : * Apply the opclass choose function to figure out how to insert
2157 : : * the given datum into the current inner tuple.
2158 : : */
2159 : : SpGistInnerTuple innerTuple;
2160 : : spgChooseIn in;
2161 : : spgChooseOut out;
2162 : :
2163 : : /*
2164 : : * spgAddNode and spgSplitTuple cases will loop back to here to
2165 : : * complete the insertion operation. Just in case the choose
2166 : : * function is broken and produces add or split requests
2167 : : * repeatedly, check for query cancel (see comments above).
2168 : : */
2169 : 9329381 : process_inner_tuple:
1066 2170 [ - + ]: 9329523 : if (INTERRUPTS_PENDING_CONDITION())
2171 : : {
1066 tgl@sss.pgh.pa.us 2172 :UBC 0 : result = false;
2173 : 0 : break;
2174 : : }
2175 : :
4502 tgl@sss.pgh.pa.us 2176 :CBC 9329523 : innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2489 2177 : 9329523 : PageGetItemId(current.page, current.offnum));
2178 : :
1105 2179 : 9329523 : in.datum = datums[spgKeyColumn];
2180 : 9329523 : in.leafDatum = leafDatums[spgKeyColumn];
4502 2181 : 9329523 : in.level = level;
2182 : 9329523 : in.allTheSame = innerTuple->allTheSame;
2183 : 9329523 : in.hasPrefix = (innerTuple->prefixSize > 0);
2184 [ + + + + ]: 9329523 : in.prefixDatum = SGITDATUM(innerTuple, state);
2185 : 9329523 : in.nNodes = innerTuple->nNodes;
2186 : 9329523 : in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2187 : :
2188 : 9329523 : memset(&out, 0, sizeof(out));
2189 : :
4417 2190 [ + - ]: 9329523 : if (!isnull)
2191 : : {
2192 : : /* use user-defined choose method */
2193 : 9329523 : FunctionCall2Coll(procinfo,
2194 : 9329523 : index->rd_indcollation[0],
2195 : : PointerGetDatum(&in),
2196 : : PointerGetDatum(&out));
2197 : : }
2198 : : else
2199 : : {
2200 : : /* force "match" action (to insert to random subnode) */
4417 tgl@sss.pgh.pa.us 2201 :UBC 0 : out.resultType = spgMatchNode;
2202 : : }
2203 : :
4502 tgl@sss.pgh.pa.us 2204 [ + + ]:CBC 9329523 : if (innerTuple->allTheSame)
2205 : : {
2206 : : /*
2207 : : * It's not allowed to do an AddNode at an allTheSame tuple.
2208 : : * Opclass must say "match", in which case we choose a random
2209 : : * one of the nodes to descend into, or "split".
2210 : : */
2211 [ - + ]: 72535 : if (out.resultType == spgAddNode)
4502 tgl@sss.pgh.pa.us 2212 [ # # ]:UBC 0 : elog(ERROR, "cannot add a node to an allTheSame inner tuple");
4502 tgl@sss.pgh.pa.us 2213 [ + + ]:CBC 72535 : else if (out.resultType == spgMatchNode)
868 2214 : 72529 : out.result.matchNode.nodeN =
2215 : 72529 : pg_prng_uint64_range(&pg_global_prng_state,
2216 : 72529 : 0, innerTuple->nNodes - 1);
2217 : : }
2218 : :
4502 2219 [ + + + - ]: 9329523 : switch (out.resultType)
2220 : : {
2221 : 9328411 : case spgMatchNode:
2222 : : /* Descend to N'th child node */
2223 : 9328411 : spgMatchNodeAction(index, state, innerTuple,
2224 : : ¤t, &parent,
2225 : : out.result.matchNode.nodeN);
2226 : : /* Adjust level as per opclass request */
2227 : 9328411 : level += out.result.matchNode.levelAdd;
2228 : : /* Replace leafDatum and recompute leafSize */
4417 2229 [ + - ]: 9328411 : if (!isnull)
2230 : : {
1105 2231 : 9328411 : leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2232 : 9328411 : leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2233 : : leafDatums, isnulls);
1109 2234 : 9328411 : leafSize += sizeof(ItemIdData);
2235 : : }
2236 : :
2237 : : /*
2238 : : * Check new tuple size; fail if it can't fit, unless the
2239 : : * opclass says it can handle the situation by suffixing.
2240 : : *
2241 : : * However, the opclass can only shorten the leaf datum,
2242 : : * which may not be enough to ever make the tuple fit,
2243 : : * since INCLUDE columns might alone use more than a page.
2244 : : * Depending on the opclass' behavior, that could lead to
2245 : : * an infinite loop --- spgtextproc.c, for example, will
2246 : : * just repeatedly generate an empty-string leaf datum
2247 : : * once it runs out of data. Actual bugs in opclasses
2248 : : * might cause infinite looping, too. To detect such a
2249 : : * loop, check to see if we are making progress by
2250 : : * reducing the leafSize in each pass. This is a bit
2251 : : * tricky though. Because of alignment considerations,
2252 : : * the total tuple size might not decrease on every pass.
2253 : : * Also, there are edge cases where the choose method
2254 : : * might seem to not make progress for a cycle or two.
2255 : : * Somewhat arbitrarily, we allow up to 10 no-progress
2256 : : * iterations before failing. (This limit should be more
2257 : : * than MAXALIGN, to accommodate opclasses that trim one
2258 : : * byte from the leaf datum per pass.)
2259 : : */
1066 2260 [ + + ]: 9328411 : if (leafSize > SPGIST_PAGE_CAPACITY)
2261 : : {
2262 : 26 : bool ok = false;
2263 : :
2264 [ + - + - ]: 26 : if (state->config.longValuesOK && !isnull)
2265 : : {
2266 [ + + ]: 26 : if (leafSize < bestLeafSize)
2267 : : {
2268 : 2 : ok = true;
2269 : 2 : bestLeafSize = leafSize;
2270 : 2 : numNoProgressCycles = 0;
2271 : : }
2272 [ + + ]: 24 : else if (++numNoProgressCycles < 10)
2273 : 22 : ok = true;
2274 : : }
2275 [ + + ]: 26 : if (!ok)
2276 [ + - ]: 2 : ereport(ERROR,
2277 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2278 : : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2279 : : leafSize - sizeof(ItemIdData),
2280 : : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2281 : : RelationGetRelationName(index)),
2282 : : errhint("Values larger than a buffer page cannot be indexed.")));
2283 : : }
2284 : :
2285 : : /*
2286 : : * Loop around and attempt to insert the new leafDatum at
2287 : : * "current" (which might reference an existing child
2288 : : * tuple, or might be invalid to force us to find a new
2289 : : * page for the tuple).
2290 : : */
4502 2291 : 9328409 : break;
2292 : 789 : case spgAddNode:
2293 : : /* AddNode is not sensible if nodes don't have labels */
2294 [ - + ]: 789 : if (in.nodeLabels == NULL)
4502 tgl@sss.pgh.pa.us 2295 [ # # ]:UBC 0 : elog(ERROR, "cannot add a node to an inner tuple without node labels");
2296 : : /* Add node to inner tuple, per request */
4502 tgl@sss.pgh.pa.us 2297 :CBC 789 : spgAddNodeAction(index, state, innerTuple,
2298 : : ¤t, &parent,
2299 : : out.result.addNode.nodeN,
2300 : : out.result.addNode.nodeLabel);
2301 : :
2302 : : /*
2303 : : * Retry insertion into the enlarged node. We assume that
2304 : : * we'll get a MatchNode result this time.
2305 : : */
2306 : 789 : goto process_inner_tuple;
2307 : : break;
2308 : 323 : case spgSplitTuple:
2309 : : /* Split inner tuple, per request */
2310 : 323 : spgSplitNodeAction(index, state, innerTuple,
2311 : : ¤t, &out);
2312 : :
2313 : : /* Retry insertion into the split node */
2314 : 323 : goto process_inner_tuple;
2315 : : break;
4502 tgl@sss.pgh.pa.us 2316 :UBC 0 : default:
2317 [ # # ]: 0 : elog(ERROR, "unrecognized SPGiST choose result: %d",
2318 : : (int) out.resultType);
2319 : : break;
2320 : : }
2321 : : }
2322 : : } /* end loop */
2323 : :
2324 : : /*
2325 : : * Release any buffers we're still holding. Beware of possibility that
2326 : : * current and parent reference same buffer.
2327 : : */
4502 tgl@sss.pgh.pa.us 2328 [ + - ]:CBC 402679 : if (current.buffer != InvalidBuffer)
2329 : : {
2330 : 402679 : SpGistSetLastUsedPage(index, current.buffer);
2331 : 402679 : UnlockReleaseBuffer(current.buffer);
2332 : : }
2333 [ + + ]: 402679 : if (parent.buffer != InvalidBuffer &&
2334 [ + + ]: 394190 : parent.buffer != current.buffer)
2335 : : {
2336 : 391890 : SpGistSetLastUsedPage(index, parent.buffer);
2337 : 391890 : UnlockReleaseBuffer(parent.buffer);
2338 : : }
2339 : :
2340 : : /*
2341 : : * We do not support being called while some outer function is holding a
2342 : : * buffer lock (or any other reason to postpone query cancels). If that
2343 : : * were the case, telling the caller to retry would create an infinite
2344 : : * loop.
2345 : : */
1066 2346 [ + - + - : 402679 : Assert(INTERRUPTS_CAN_BE_PROCESSED());
- + ]
2347 : :
2348 : : /*
2349 : : * Finally, check for interrupts again. If there was a query cancel,
2350 : : * ProcessInterrupts() will be able to throw the error here. If it was
2351 : : * some other kind of interrupt that can just be cleared, return false to
2352 : : * tell our caller to retry.
2353 : : */
2354 [ - + ]: 402679 : CHECK_FOR_INTERRUPTS();
2355 : :
2356 : 402679 : return result;
2357 : : }
|