Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * ginfast.c
4 : * Fast insert routines for the Postgres inverted index access method.
5 : * Pending entries are stored in linear list of pages. Later on
6 : * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 : * transfer pending entries into the regular index structure. This
8 : * wins because bulk insertion is much more efficient than retail.
9 : *
10 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
11 : * Portions Copyright (c) 1994, Regents of the University of California
12 : *
13 : * IDENTIFICATION
14 : * src/backend/access/gin/ginfast.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "access/gin_private.h"
22 : #include "access/ginxlog.h"
23 : #include "access/xlog.h"
24 : #include "access/xloginsert.h"
25 : #include "catalog/pg_am.h"
26 : #include "commands/vacuum.h"
27 : #include "miscadmin.h"
28 : #include "port/pg_bitutils.h"
29 : #include "postmaster/autovacuum.h"
30 : #include "storage/indexfsm.h"
31 : #include "storage/lmgr.h"
32 : #include "storage/predicate.h"
33 : #include "utils/acl.h"
34 : #include "utils/builtins.h"
35 : #include "utils/memutils.h"
36 : #include "utils/rel.h"
37 :
38 : /* GUC parameter */
39 : int gin_pending_list_limit = 0;
40 :
41 : #define GIN_PAGE_FREESIZE \
42 : ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43 :
44 : typedef struct KeyArray
45 : {
46 : Datum *keys; /* expansible array */
47 : GinNullCategory *categories; /* another expansible array */
48 : int32 nvalues; /* current number of valid entries */
49 : int32 maxvalues; /* allocated size of arrays */
50 : } KeyArray;
51 :
52 :
53 : /*
54 : * Build a pending-list page from the given array of tuples, and write it out.
55 : *
56 : * Returns amount of free space left on the page.
57 : */
58 : static int32
5129 tgl 59 CBC 1430 : writeListPage(Relation index, Buffer buffer,
60 : IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 : {
2545 kgrittn 62 1430 : Page page = BufferGetPage(buffer);
63 : int32 i,
64 : freesize,
5050 bruce 65 1430 : size = 0;
66 : OffsetNumber l,
67 : off;
68 : PGAlignedBlock workspace;
69 : char *ptr;
70 :
5129 tgl 71 1430 : START_CRIT_SECTION();
72 :
73 1430 : GinInitBuffer(buffer, GIN_LIST);
74 :
75 1430 : off = FirstOffsetNumber;
1681 76 1430 : ptr = workspace.data;
77 :
5050 bruce 78 8354 : for (i = 0; i < ntuples; i++)
79 : {
80 6924 : int this_size = IndexTupleSize(tuples[i]);
81 :
5129 tgl 82 6924 : memcpy(ptr, tuples[i], this_size);
83 6924 : ptr += this_size;
84 6924 : size += this_size;
85 :
5050 bruce 86 6924 : l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87 :
5129 tgl 88 6924 : if (l == InvalidOffsetNumber)
5129 tgl 89 UBC 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
90 : RelationGetRelationName(index));
91 :
5129 tgl 92 CBC 6924 : off++;
93 : }
94 :
95 1430 : Assert(size <= BLCKSZ); /* else we overran workspace */
96 :
97 1430 : GinPageGetOpaque(page)->rightlink = rightlink;
98 :
99 : /*
100 : * tail page may contain only whole row(s) or final part of row placed on
101 : * previous pages (a "row" here meaning all the index tuples generated for
102 : * one heap tuple)
103 : */
5050 bruce 104 1430 : if (rightlink == InvalidBlockNumber)
105 : {
5129 tgl 106 1430 : GinPageSetFullRow(page);
107 1430 : GinPageGetOpaque(page)->maxoff = 1;
108 : }
109 : else
110 : {
5129 tgl 111 UBC 0 : GinPageGetOpaque(page)->maxoff = 0;
112 : }
113 :
5129 tgl 114 CBC 1430 : MarkBufferDirty(buffer);
115 :
4500 rhaas 116 1430 : if (RelationNeedsWAL(index))
117 : {
118 : ginxlogInsertListPage data;
119 : XLogRecPtr recptr;
120 :
4954 tgl 121 538 : data.rightlink = rightlink;
122 538 : data.ntuples = ntuples;
123 :
3062 heikki.linnakangas 124 538 : XLogBeginInsert();
125 538 : XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126 :
127 538 : XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
1681 tgl 128 538 : XLogRegisterBufData(0, workspace.data, size);
129 :
3062 heikki.linnakangas 130 538 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
5129 tgl 131 538 : PageSetLSN(page, recptr);
132 : }
133 :
134 : /* get free space before releasing buffer */
4954 135 1430 : freesize = PageGetExactFreeSpace(page);
136 :
5129 137 1430 : UnlockReleaseBuffer(buffer);
138 :
139 1430 : END_CRIT_SECTION();
140 :
141 1430 : return freesize;
142 : }
143 :
144 : static void
145 1430 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
146 : GinMetaPageData *res)
147 : {
5050 bruce 148 1430 : Buffer curBuffer = InvalidBuffer;
149 1430 : Buffer prevBuffer = InvalidBuffer;
150 : int i,
151 1430 : size = 0,
152 : tupsize;
153 1430 : int startTuple = 0;
154 :
5129 tgl 155 1430 : Assert(ntuples > 0);
156 :
157 : /*
158 : * Split tuples into pages
159 : */
5050 bruce 160 8354 : for (i = 0; i < ntuples; i++)
161 : {
162 6924 : if (curBuffer == InvalidBuffer)
163 : {
5129 tgl 164 1430 : curBuffer = GinNewBuffer(index);
165 :
5050 bruce 166 1430 : if (prevBuffer != InvalidBuffer)
167 : {
5129 tgl 168 UBC 0 : res->nPendingPages++;
169 0 : writeListPage(index, prevBuffer,
4954 170 0 : tuples + startTuple,
171 : i - startTuple,
172 : BufferGetBlockNumber(curBuffer));
173 : }
174 : else
175 : {
5129 tgl 176 CBC 1430 : res->head = BufferGetBlockNumber(curBuffer);
177 : }
178 :
179 1430 : prevBuffer = curBuffer;
180 1430 : startTuple = i;
181 1430 : size = 0;
182 : }
183 :
184 6924 : tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 :
4954 186 6924 : if (size + tupsize > GinListPageSize)
187 : {
188 : /* won't fit, force a new page and reprocess */
5129 tgl 189 UBC 0 : i--;
190 0 : curBuffer = InvalidBuffer;
191 : }
192 : else
193 : {
5129 tgl 194 CBC 6924 : size += tupsize;
195 : }
196 : }
197 :
198 : /*
199 : * Write last page
200 : */
201 1430 : res->tail = BufferGetBlockNumber(curBuffer);
202 2860 : res->tailFreeSize = writeListPage(index, curBuffer,
4954 203 1430 : tuples + startTuple,
204 : ntuples - startTuple,
205 : InvalidBlockNumber);
5129 206 1430 : res->nPendingPages++;
207 : /* that was only one heap tuple */
208 1430 : res->nPendingHeapTuples = 1;
209 1430 : }
210 :
211 : /*
212 : * Write the index tuples contained in *collector into the index's
213 : * pending list.
214 : *
215 : * Function guarantees that all these tuples will be inserted consecutively,
216 : * preserving order
217 : */
218 : void
4475 219 131895 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
220 : {
221 131895 : Relation index = ginstate->index;
222 : Buffer metabuffer;
223 : Page metapage;
5050 bruce 224 131895 : GinMetaPageData *metadata = NULL;
225 131895 : Buffer buffer = InvalidBuffer;
226 131895 : Page page = NULL;
227 : ginxlogUpdateMeta data;
228 131895 : bool separateList = false;
229 131895 : bool needCleanup = false;
230 : int cleanupSize;
231 : bool needWal;
232 :
233 131895 : if (collector->ntuples == 0)
5129 tgl 234 UBC 0 : return;
235 :
3062 heikki.linnakangas 236 CBC 131895 : needWal = RelationNeedsWAL(index);
237 :
277 rhaas 238 GNC 131895 : data.locator = index->rd_locator;
5129 tgl 239 CBC 131895 : data.ntuples = 0;
240 131895 : data.newRightlink = data.prevTail = InvalidBlockNumber;
241 :
242 131895 : metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
2545 kgrittn 243 131895 : metapage = BufferGetPage(metabuffer);
244 :
245 : /*
246 : * An insertion to the pending list could logically belong anywhere in the
247 : * tree, so it conflicts with all serializable scans. All scans acquire a
248 : * predicate lock on the metabuffer to represent that.
249 : */
1167 tmunro 250 131895 : CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
251 :
4954 tgl 252 131892 : if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
253 : {
254 : /*
255 : * Total size is greater than one page => make sublist
256 : */
5129 tgl 257 UBC 0 : separateList = true;
258 : }
259 : else
260 : {
5129 tgl 261 CBC 131892 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
262 131892 : metadata = GinPageGetMeta(metapage);
263 :
5050 bruce 264 131892 : if (metadata->head == InvalidBlockNumber ||
265 131867 : collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
266 : {
267 : /*
268 : * Pending list is empty or total size is greater than freespace
269 : * on tail page => make sublist
270 : *
271 : * We unlock metabuffer to keep high concurrency
272 : */
5129 tgl 273 1430 : separateList = true;
274 1430 : LockBuffer(metabuffer, GIN_UNLOCK);
275 : }
276 : }
277 :
5050 bruce 278 131892 : if (separateList)
279 : {
280 : /*
281 : * We should make sublist separately and append it to the tail
282 : */
283 : GinMetaPageData sublist;
284 :
4954 tgl 285 1430 : memset(&sublist, 0, sizeof(GinMetaPageData));
5129 286 1430 : makeSublist(index, collector->tuples, collector->ntuples, &sublist);
287 :
288 : /*
289 : * metapage was unlocked, see above
290 : */
291 1430 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
292 1430 : metadata = GinPageGetMeta(metapage);
293 :
5050 bruce 294 1430 : if (metadata->head == InvalidBlockNumber)
295 : {
296 : /*
297 : * Main list is empty, so just insert sublist as main list
298 : */
5129 tgl 299 25 : START_CRIT_SECTION();
300 :
4475 301 25 : metadata->head = sublist.head;
302 25 : metadata->tail = sublist.tail;
303 25 : metadata->tailFreeSize = sublist.tailFreeSize;
304 :
305 25 : metadata->nPendingPages = sublist.nPendingPages;
306 25 : metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
307 :
178 michael 308 25 : if (needWal)
309 15 : XLogBeginInsert();
310 : }
311 : else
312 : {
313 : /*
314 : * Merge lists
315 : */
5129 tgl 316 1405 : data.prevTail = metadata->tail;
4954 317 1405 : data.newRightlink = sublist.head;
318 :
5129 319 1405 : buffer = ReadBuffer(index, metadata->tail);
320 1405 : LockBuffer(buffer, GIN_EXCLUSIVE);
2545 kgrittn 321 1405 : page = BufferGetPage(buffer);
322 :
5129 tgl 323 1405 : Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
324 :
325 1405 : START_CRIT_SECTION();
326 :
327 1405 : GinPageGetOpaque(page)->rightlink = sublist.head;
328 :
4954 329 1405 : MarkBufferDirty(buffer);
330 :
5129 331 1405 : metadata->tail = sublist.tail;
332 1405 : metadata->tailFreeSize = sublist.tailFreeSize;
333 :
334 1405 : metadata->nPendingPages += sublist.nPendingPages;
335 1405 : metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
336 :
3062 heikki.linnakangas 337 1405 : if (needWal)
338 : {
178 michael 339 523 : XLogBeginInsert();
3062 heikki.linnakangas 340 523 : XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
341 : }
342 : }
343 : }
344 : else
345 : {
346 : /*
347 : * Insert into tail page. Metapage is already locked
348 : */
349 : OffsetNumber l,
350 : off;
351 : int i,
352 : tupsize;
353 : char *ptr;
354 : char *collectordata;
355 :
5129 tgl 356 130462 : buffer = ReadBuffer(index, metadata->tail);
357 130462 : LockBuffer(buffer, GIN_EXCLUSIVE);
2545 kgrittn 358 130462 : page = BufferGetPage(buffer);
359 :
5129 tgl 360 130462 : off = (PageIsEmpty(page)) ? FirstOffsetNumber :
5050 bruce 361 130462 : OffsetNumberNext(PageGetMaxOffsetNumber(page));
362 :
3062 heikki.linnakangas 363 130462 : collectordata = ptr = (char *) palloc(collector->sumsize);
364 :
5129 tgl 365 130462 : data.ntuples = collector->ntuples;
366 :
178 michael 367 130462 : START_CRIT_SECTION();
368 :
3062 heikki.linnakangas 369 130462 : if (needWal)
370 71307 : XLogBeginInsert();
371 :
372 : /*
373 : * Increase counter of heap tuples
374 : */
5050 bruce 375 130462 : Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
5129 tgl 376 130462 : GinPageGetOpaque(page)->maxoff++;
377 130462 : metadata->nPendingHeapTuples++;
378 :
5050 bruce 379 699094 : for (i = 0; i < collector->ntuples; i++)
380 : {
5129 tgl 381 568632 : tupsize = IndexTupleSize(collector->tuples[i]);
5050 bruce 382 568632 : l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
383 :
5129 tgl 384 568632 : if (l == InvalidOffsetNumber)
5129 tgl 385 UBC 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
386 : RelationGetRelationName(index));
387 :
5129 tgl 388 CBC 568632 : memcpy(ptr, collector->tuples[i], tupsize);
5050 bruce 389 568632 : ptr += tupsize;
390 :
5129 tgl 391 568632 : off++;
392 : }
393 :
3062 heikki.linnakangas 394 130462 : Assert((ptr - collectordata) <= collector->sumsize);
395 130462 : if (needWal)
396 : {
397 71307 : XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
398 71307 : XLogRegisterBufData(1, collectordata, collector->sumsize);
399 : }
400 :
4954 tgl 401 130462 : metadata->tailFreeSize = PageGetExactFreeSpace(page);
402 :
5129 403 130462 : MarkBufferDirty(buffer);
404 : }
405 :
406 : /*
407 : * Set pd_lower just past the end of the metadata. This is essential,
408 : * because without doing so, metadata will be lost if xlog.c compresses
409 : * the page. (We must do this here because pre-v11 versions of PG did not
410 : * set the metapage's pd_lower correctly, so a pg_upgraded index might
411 : * contain the wrong value.)
412 : */
1984 413 131892 : ((PageHeader) metapage)->pd_lower =
414 131892 : ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
415 :
416 : /*
417 : * Write metabuffer, make xlog entry
418 : */
5129 419 131892 : MarkBufferDirty(metabuffer);
420 :
3062 heikki.linnakangas 421 131892 : if (needWal)
422 : {
423 : XLogRecPtr recptr;
424 :
4954 tgl 425 71845 : memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
426 :
1984 427 71845 : XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
3062 heikki.linnakangas 428 71845 : XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
429 :
430 71845 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
5129 tgl 431 71845 : PageSetLSN(metapage, recptr);
432 :
5050 bruce 433 71845 : if (buffer != InvalidBuffer)
434 : {
5129 tgl 435 71830 : PageSetLSN(page, recptr);
436 : }
437 : }
438 :
439 131892 : if (buffer != InvalidBuffer)
440 131867 : UnlockReleaseBuffer(buffer);
441 :
442 : /*
443 : * Force pending list cleanup when it becomes too long. And,
444 : * ginInsertCleanup could take significant amount of time, so we prefer to
445 : * call it when it can do all the work in a single collection cycle. In
446 : * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
447 : * while pending list is still small enough to fit into
448 : * gin_pending_list_limit.
449 : *
450 : * ginInsertCleanup() should not be called inside our CRIT_SECTION.
451 : */
3071 fujii 452 131892 : cleanupSize = GinGetPendingListCleanupSize(index);
453 131892 : if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
5129 tgl 454 UBC 0 : needCleanup = true;
455 :
5129 tgl 456 CBC 131892 : UnlockReleaseBuffer(metabuffer);
457 :
458 131892 : END_CRIT_SECTION();
459 :
460 : /*
461 : * Since it could contend with concurrent cleanup process we cleanup
462 : * pending list not forcibly.
463 : */
5050 bruce 464 131892 : if (needCleanup)
1970 rhaas 465 UBC 0 : ginInsertCleanup(ginstate, false, true, false, NULL);
466 : }
467 :
468 : /*
469 : * Create temporary index tuples for a single indexable item (one index column
470 : * for the heap tuple specified by ht_ctid), and append them to the array
471 : * in *collector. They will subsequently be written out using
472 : * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
473 : * temp tuples for a given heap tuple must be written in one call to
474 : * ginHeapTupleFastInsert.
475 : */
476 : void
4475 tgl 477 CBC 191934 : ginHeapTupleFastCollect(GinState *ginstate,
478 : GinTupleCollector *collector,
479 : OffsetNumber attnum, Datum value, bool isNull,
480 : ItemPointer ht_ctid)
481 : {
482 : Datum *entries;
483 : GinNullCategory *categories;
484 : int32 i,
485 : nentries;
486 :
487 : /*
488 : * Extract the key values that need to be inserted in the index
489 : */
490 191934 : entries = ginExtractEntries(ginstate, attnum, value, isNull,
491 : &nentries, &categories);
492 :
493 : /*
494 : * Protect against integer overflow in allocation calculations
495 : */
1572 496 191934 : if (nentries < 0 ||
497 191934 : collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
1572 tgl 498 UBC 0 : elog(ERROR, "too many entries for GIN index");
499 :
500 : /*
501 : * Allocate/reallocate memory for storing collected tuples
502 : */
5050 bruce 503 CBC 191934 : if (collector->tuples == NULL)
504 : {
505 : /*
506 : * Determine the number of elements to allocate in the tuples array
507 : * initially. Make it a power of 2 to avoid wasting memory when
508 : * resizing (since palloc likes powers of 2).
509 : */
1096 drowley 510 131895 : collector->lentuples = pg_nextpower2_32(Max(16, nentries));
209 peter 511 GNC 131895 : collector->tuples = palloc_array(IndexTuple, collector->lentuples);
512 : }
1572 tgl 513 CBC 60039 : else if (collector->lentuples < collector->ntuples + nentries)
514 : {
515 : /*
516 : * Advance lentuples to the next suitable power of 2. This won't
517 : * overflow, though we could get to a value that exceeds
518 : * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
519 : */
1096 drowley 520 UBC 0 : collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
209 peter 521 UNC 0 : collector->tuples = repalloc_array(collector->tuples,
522 : IndexTuple, collector->lentuples);
523 : }
524 :
525 : /*
526 : * Build an index tuple for each key value, and add to array. In pending
527 : * tuples we just stick the heap TID into t_tid.
528 : */
5129 tgl 529 CBC 767493 : for (i = 0; i < nentries; i++)
530 : {
531 : IndexTuple itup;
532 :
4475 533 575559 : itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
534 : NULL, 0, 0, true);
535 575559 : itup->t_tid = *ht_ctid;
536 575559 : collector->tuples[collector->ntuples++] = itup;
537 575559 : collector->sumsize += IndexTupleSize(itup);
538 : }
5129 539 191934 : }
540 :
541 : /*
542 : * Deletes pending list pages up to (not including) newHead page.
543 : * If newHead == InvalidBlockNumber then function drops the whole list.
544 : *
545 : * metapage is pinned and exclusive-locked throughout this function.
546 : */
547 : static void
548 15 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
549 : bool fill_fsm, IndexBulkDeleteResult *stats)
550 : {
551 : Page metapage;
552 : GinMetaPageData *metadata;
553 : BlockNumber blknoToDelete;
554 :
2545 kgrittn 555 15 : metapage = BufferGetPage(metabuffer);
5129 tgl 556 15 : metadata = GinPageGetMeta(metapage);
557 15 : blknoToDelete = metadata->head;
558 :
559 : do
560 : {
561 : Page page;
562 : int i;
5050 bruce 563 96 : int64 nDeletedHeapTuples = 0;
564 : ginxlogDeleteListPages data;
565 : Buffer buffers[GIN_NDELETE_AT_ONCE];
566 : BlockNumber freespace[GIN_NDELETE_AT_ONCE];
567 :
5129 tgl 568 96 : data.ndeleted = 0;
569 1516 : while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
570 : {
2771 teodor 571 1420 : freespace[data.ndeleted] = blknoToDelete;
5050 bruce 572 1420 : buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
573 1420 : LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
2545 kgrittn 574 1420 : page = BufferGetPage(buffers[data.ndeleted]);
575 :
5129 tgl 576 1420 : data.ndeleted++;
577 :
2537 teodor 578 1420 : Assert(!GinPageIsDeleted(page));
579 :
5129 tgl 580 1420 : nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
5050 bruce 581 1420 : blknoToDelete = GinPageGetOpaque(page)->rightlink;
582 : }
583 :
5129 tgl 584 96 : if (stats)
585 96 : stats->pages_deleted += data.ndeleted;
586 :
587 : /*
588 : * This operation touches an unusually large number of pages, so
589 : * prepare the XLogInsert machinery for that before entering the
590 : * critical section.
591 : */
3061 heikki.linnakangas 592 96 : if (RelationNeedsWAL(index))
593 39 : XLogEnsureRecordSpace(data.ndeleted, 0);
594 :
5129 tgl 595 96 : START_CRIT_SECTION();
596 :
597 96 : metadata->head = blknoToDelete;
598 :
5050 bruce 599 96 : Assert(metadata->nPendingPages >= data.ndeleted);
5129 tgl 600 96 : metadata->nPendingPages -= data.ndeleted;
5050 bruce 601 96 : Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
5129 tgl 602 96 : metadata->nPendingHeapTuples -= nDeletedHeapTuples;
603 :
5050 bruce 604 96 : if (blknoToDelete == InvalidBlockNumber)
605 : {
5129 tgl 606 15 : metadata->tail = InvalidBlockNumber;
607 15 : metadata->tailFreeSize = 0;
608 15 : metadata->nPendingPages = 0;
609 15 : metadata->nPendingHeapTuples = 0;
610 : }
611 :
612 : /*
613 : * Set pd_lower just past the end of the metadata. This is essential,
614 : * because without doing so, metadata will be lost if xlog.c
615 : * compresses the page. (We must do this here because pre-v11
616 : * versions of PG did not set the metapage's pd_lower correctly, so a
617 : * pg_upgraded index might contain the wrong value.)
618 : */
1984 619 96 : ((PageHeader) metapage)->pd_lower =
620 96 : ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
621 :
5050 bruce 622 96 : MarkBufferDirty(metabuffer);
623 :
624 1516 : for (i = 0; i < data.ndeleted; i++)
625 : {
2545 kgrittn 626 1420 : page = BufferGetPage(buffers[i]);
5050 bruce 627 1420 : GinPageGetOpaque(page)->flags = GIN_DELETED;
628 1420 : MarkBufferDirty(buffers[i]);
629 : }
630 :
4500 rhaas 631 96 : if (RelationNeedsWAL(index))
632 : {
633 : XLogRecPtr recptr;
634 :
3062 heikki.linnakangas 635 39 : XLogBeginInsert();
1984 tgl 636 39 : XLogRegisterBuffer(0, metabuffer,
637 : REGBUF_WILL_INIT | REGBUF_STANDARD);
3062 heikki.linnakangas 638 574 : for (i = 0; i < data.ndeleted; i++)
639 535 : XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
640 :
4954 tgl 641 39 : memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
642 :
3062 heikki.linnakangas 643 39 : XLogRegisterData((char *) &data,
644 : sizeof(ginxlogDeleteListPages));
645 :
646 39 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
5129 tgl 647 39 : PageSetLSN(metapage, recptr);
648 :
5050 bruce 649 574 : for (i = 0; i < data.ndeleted; i++)
650 : {
2545 kgrittn 651 535 : page = BufferGetPage(buffers[i]);
5129 tgl 652 535 : PageSetLSN(page, recptr);
653 : }
654 : }
655 :
5050 bruce 656 1516 : for (i = 0; i < data.ndeleted; i++)
657 1420 : UnlockReleaseBuffer(buffers[i]);
658 :
5129 tgl 659 96 : END_CRIT_SECTION();
660 :
2755 teodor 661 1446 : for (i = 0; fill_fsm && i < data.ndeleted; i++)
2771 662 1350 : RecordFreeIndexPage(index, freespace[i]);
663 :
5050 bruce 664 96 : } while (blknoToDelete != newHead);
5129 tgl 665 15 : }
666 :
667 : /* Initialize empty KeyArray */
668 : static void
4475 669 15 : initKeyArray(KeyArray *keys, int32 maxvalues)
670 : {
209 peter 671 GNC 15 : keys->keys = palloc_array(Datum, maxvalues);
672 15 : keys->categories = palloc_array(GinNullCategory, maxvalues);
4475 tgl 673 CBC 15 : keys->nvalues = 0;
674 15 : keys->maxvalues = maxvalues;
4475 tgl 675 GIC 15 : }
676 :
677 : /* Add datum to KeyArray, resizing if needed */
4475 tgl 678 ECB : static void
4475 tgl 679 GIC 575484 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
4475 tgl 680 ECB : {
4475 tgl 681 GIC 575484 : if (keys->nvalues >= keys->maxvalues)
5129 tgl 682 EUB : {
4475 tgl 683 UBC 0 : keys->maxvalues *= 2;
209 peter 684 UNC 0 : keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
685 0 : keys->categories = repalloc_array(keys->categories, GinNullCategory, keys->maxvalues);
5129 tgl 686 ECB : }
687 :
4475 tgl 688 CBC 575484 : keys->keys[keys->nvalues] = datum;
4475 tgl 689 GIC 575484 : keys->categories[keys->nvalues] = category;
690 575484 : keys->nvalues++;
5129 691 575484 : }
692 :
693 : /*
694 : * Collect data from a pending-list page in preparation for insertion into
695 : * the main index.
696 : *
697 : * Go through all tuples >= startoff on page and collect values in accum
698 : *
699 : * Note that ka is just workspace --- it does not carry any state across
5129 tgl 700 ECB : * calls.
701 : */
702 : static void
4475 tgl 703 GIC 1420 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
704 : Page page, OffsetNumber startoff)
705 : {
706 : ItemPointerData heapptr;
707 : OffsetNumber i,
708 : maxoff;
4475 tgl 709 ECB : OffsetNumber attrnum;
710 :
711 : /* reset *ka to empty */
4475 tgl 712 CBC 1420 : ka->nvalues = 0;
5129 tgl 713 ECB :
5129 tgl 714 CBC 1420 : maxoff = PageGetMaxOffsetNumber(page);
5050 bruce 715 GIC 1420 : Assert(maxoff >= FirstOffsetNumber);
5129 tgl 716 CBC 1420 : ItemPointerSetInvalid(&heapptr);
5129 tgl 717 GIC 1420 : attrnum = 0;
5129 tgl 718 ECB :
5129 tgl 719 GIC 576904 : for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
720 : {
5050 bruce 721 575484 : IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
722 : OffsetNumber curattnum;
723 : Datum curkey;
4475 tgl 724 ECB : GinNullCategory curcategory;
725 :
726 : /* Check for change of heap TID or attnum */
5129 tgl 727 GIC 575484 : curattnum = gintuple_get_attrnum(accum->ginstate, itup);
5129 tgl 728 ECB :
5050 bruce 729 CBC 575484 : if (!ItemPointerIsValid(&heapptr))
730 : {
5129 tgl 731 1420 : heapptr = itup->t_tid;
5129 tgl 732 GIC 1420 : attrnum = curattnum;
733 : }
5050 bruce 734 574064 : else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
735 : curattnum == attrnum))
736 : {
737 : /*
738 : * ginInsertBAEntries can insert several datums per call, but only
4475 tgl 739 ECB : * for one heap tuple and one column. So call it at a boundary,
740 : * and reset ka.
5129 741 : */
4475 tgl 742 CBC 190473 : ginInsertBAEntries(accum, &heapptr, attrnum,
4475 tgl 743 ECB : ka->keys, ka->categories, ka->nvalues);
4475 tgl 744 GIC 190473 : ka->nvalues = 0;
5129 745 190473 : heapptr = itup->t_tid;
746 190473 : attrnum = curattnum;
5129 tgl 747 ECB : }
4475 748 :
749 : /* Add key to KeyArray */
4475 tgl 750 GIC 575484 : curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
751 575484 : addDatum(ka, curkey, curcategory);
5129 tgl 752 ECB : }
753 :
4475 754 : /* Dump out all remaining keys */
4475 tgl 755 GIC 1420 : ginInsertBAEntries(accum, &heapptr, attrnum,
756 : ka->keys, ka->categories, ka->nvalues);
5129 757 1420 : }
758 :
759 : /*
760 : * Move tuples from pending pages into regular GIN structure.
761 : *
762 : * On first glance it looks completely not crash-safe. But if we crash
763 : * after posting entries to the main index and before removing them from the
764 : * pending list, it's okay because when we redo the posting later on, nothing
765 : * bad will happen.
766 : *
767 : * fill_fsm indicates that ginInsertCleanup should add deleted pages
768 : * to FSM otherwise caller is responsible to put deleted pages into
769 : * FSM.
770 : *
5129 tgl 771 ECB : * If stats isn't null, we count deleted pending pages into the counts.
772 : */
773 : void
2537 teodor 774 GIC 40 : ginInsertCleanup(GinState *ginstate, bool full_clean,
1970 rhaas 775 ECB : bool fill_fsm, bool forceCleanup,
776 : IndexBulkDeleteResult *stats)
777 : {
4475 tgl 778 GIC 40 : Relation index = ginstate->index;
779 : Buffer metabuffer,
780 : buffer;
781 : Page metapage,
782 : page;
783 : GinMetaPageData *metadata;
784 : MemoryContext opCtx,
785 : oldCtx;
786 : BuildAccumulator accum;
4475 tgl 787 ECB : KeyArray datums;
2537 teodor 788 : BlockNumber blkno,
789 : blknoFinish;
2537 teodor 790 GIC 40 : bool cleanupFinish = false;
2771 791 40 : bool fsm_vac = false;
792 : Size workMemory;
793 :
794 : /*
795 : * We would like to prevent concurrent cleanup process. For that we will
796 : * lock metapage in exclusive mode using LockPage() call. Nobody other
797 : * will use that lock for metapage, so we keep possibility of concurrent
2495 rhaas 798 ECB : * insertion into pending list
799 : */
800 :
1970 rhaas 801 GIC 40 : if (forceCleanup)
802 : {
803 : /*
2495 rhaas 804 ECB : * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
805 : * and we would like to wait concurrent cleanup to finish.
2537 teodor 806 : */
2537 teodor 807 CBC 40 : LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
2537 teodor 808 GIC 40 : workMemory =
809 44 : (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
2495 rhaas 810 44 : autovacuum_work_mem : maintenance_work_mem;
811 : }
812 : else
813 : {
814 : /*
815 : * We are called from regular insert and if we see concurrent cleanup
2495 rhaas 816 EUB : * just exit in hope that concurrent process will clean up pending
2495 rhaas 817 ECB : * list.
2537 teodor 818 EUB : */
2537 teodor 819 UIC 0 : if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
2537 teodor 820 GIC 25 : return;
2537 teodor 821 LBC 0 : workMemory = work_mem;
2537 teodor 822 ECB : }
5129 tgl 823 :
5129 tgl 824 CBC 40 : metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
5129 tgl 825 GIC 40 : LockBuffer(metabuffer, GIN_SHARE);
2545 kgrittn 826 CBC 40 : metapage = BufferGetPage(metabuffer);
5129 tgl 827 GIC 40 : metadata = GinPageGetMeta(metapage);
828 :
5050 bruce 829 CBC 40 : if (metadata->head == InvalidBlockNumber)
5129 tgl 830 ECB : {
831 : /* Nothing to do */
5129 tgl 832 GIC 25 : UnlockReleaseBuffer(metabuffer);
2537 teodor 833 25 : UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
5129 tgl 834 25 : return;
835 : }
836 :
837 : /*
2537 teodor 838 ECB : * Remember a tail page to prevent infinite cleanup if other backends add
839 : * new tuples faster than we can cleanup.
840 : */
2537 teodor 841 GIC 15 : blknoFinish = metadata->tail;
842 :
5129 tgl 843 ECB : /*
844 : * Read and lock head of pending list
845 : */
5129 tgl 846 CBC 15 : blkno = metadata->head;
5129 tgl 847 GIC 15 : buffer = ReadBuffer(index, blkno);
5129 tgl 848 CBC 15 : LockBuffer(buffer, GIN_SHARE);
2545 kgrittn 849 GIC 15 : page = BufferGetPage(buffer);
850 :
5129 tgl 851 15 : LockBuffer(metabuffer, GIN_UNLOCK);
852 :
5129 tgl 853 ECB : /*
854 : * Initialize. All temporary space will be in opCtx
855 : */
5129 tgl 856 GIC 15 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
5129 tgl 857 ECB : "GIN insert cleanup temporary context",
858 : ALLOCSET_DEFAULT_SIZES);
859 :
5129 tgl 860 CBC 15 : oldCtx = MemoryContextSwitchTo(opCtx);
5129 tgl 861 ECB :
4475 tgl 862 GIC 15 : initKeyArray(&datums, 128);
5129 863 15 : ginInitBA(&accum);
864 15 : accum.ginstate = ginstate;
865 :
866 : /*
867 : * At the top of this loop, we have pin and lock on the current page of
868 : * the pending list. However, we'll release that before exiting the loop.
869 : * Note we also have pin but not lock on the metapage.
5129 tgl 870 ECB : */
871 : for (;;)
872 : {
2537 teodor 873 GIC 1420 : Assert(!GinPageIsDeleted(page));
874 :
875 : /*
876 : * Are we walk through the page which as we remember was a tail when
877 : * we start our cleanup? But if caller asks us to clean up whole
2495 rhaas 878 ECB : * pending list then ignore old tail, we will work until list becomes
2495 rhaas 879 EUB : * empty.
880 : */
2537 teodor 881 GIC 1420 : if (blkno == blknoFinish && full_clean == false)
2537 teodor 882 UIC 0 : cleanupFinish = true;
883 :
5129 tgl 884 ECB : /*
885 : * read page's datums into accum
886 : */
5129 tgl 887 GIC 1420 : processPendingPage(&accum, &datums, page, FirstOffsetNumber);
888 :
2771 teodor 889 1420 : vacuum_delay_point();
890 :
891 : /*
892 : * Is it time to flush memory to disk? Flush if we are at the end of
5050 bruce 893 ECB : * the pending list, or if we have a full row and memory is getting
894 : * full.
5129 tgl 895 : */
5129 tgl 896 GBC 1420 : if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
5129 tgl 897 GIC 1405 : (GinPageHasFullRow(page) &&
2537 teodor 898 1405 : (accum.allocatedMemory >= workMemory * 1024L)))
5129 tgl 899 UIC 0 : {
900 : ItemPointerData *list;
901 : uint32 nlist;
902 : Datum key;
903 : GinNullCategory category;
904 : OffsetNumber maxoff,
905 : attnum;
906 :
907 : /*
908 : * Unlock current page to increase performance. Changes of page
5050 bruce 909 ECB : * will be checked later by comparing maxoff after completion of
910 : * memory flush.
911 : */
5129 tgl 912 GIC 15 : maxoff = PageGetMaxOffsetNumber(page);
913 15 : LockBuffer(buffer, GIN_UNLOCK);
914 :
915 : /*
916 : * Moving collected data into regular structure can take
5129 tgl 917 ECB : * significant amount of time - so, run it without locking pending
918 : * list.
919 : */
4634 tgl 920 GIC 15 : ginBeginBAScan(&accum);
4475 tgl 921 CBC 183051 : while ((list = ginGetBAEntry(&accum,
2118 tgl 922 GIC 183051 : &attnum, &key, &category, &nlist)) != NULL)
5129 tgl 923 ECB : {
4475 tgl 924 GIC 183036 : ginEntryInsert(ginstate, attnum, key, category,
925 : list, nlist, NULL);
2771 teodor 926 183036 : vacuum_delay_point();
927 : }
928 :
5129 tgl 929 ECB : /*
930 : * Lock the whole list to remove pages
931 : */
5129 tgl 932 CBC 15 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
5129 tgl 933 GIC 15 : LockBuffer(buffer, GIN_SHARE);
934 :
2537 teodor 935 15 : Assert(!GinPageIsDeleted(page));
936 :
937 : /*
938 : * While we left the page unlocked, more stuff might have gotten
939 : * added to it. If so, process those entries immediately. There
940 : * shouldn't be very many, so we don't worry about the fact that
941 : * we're doing this with exclusive lock. Insertion algorithm
4003 rhaas 942 ECB : * guarantees that inserted row(s) will not continue on next page.
943 : * NOTE: intentionally no vacuum_delay_point in this loop.
5129 tgl 944 EUB : */
5050 bruce 945 GBC 15 : if (PageGetMaxOffsetNumber(page) != maxoff)
946 : {
5129 tgl 947 UBC 0 : ginInitBA(&accum);
5050 bruce 948 0 : processPendingPage(&accum, &datums, page, maxoff + 1);
5129 tgl 949 EUB :
4634 tgl 950 UBC 0 : ginBeginBAScan(&accum);
4475 tgl 951 UIC 0 : while ((list = ginGetBAEntry(&accum,
2118 952 0 : &attnum, &key, &category, &nlist)) != NULL)
4475 953 0 : ginEntryInsert(ginstate, attnum, key, category,
954 : list, nlist, NULL);
955 : }
956 :
5129 tgl 957 ECB : /*
958 : * Remember next page - it will become the new list head
959 : */
5129 tgl 960 GIC 15 : blkno = GinPageGetOpaque(page)->rightlink;
2118 961 15 : UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
962 : * locking */
963 :
964 : /*
2495 rhaas 965 ECB : * remove read pages from pending list, at this point all content
966 : * of read pages is in regular structure
967 : */
2537 teodor 968 CBC 15 : shiftList(index, metabuffer, blkno, fill_fsm, stats);
969 :
2771 teodor 970 ECB : /* At this point, some pending pages have been freed up */
2771 teodor 971 CBC 15 : fsm_vac = true;
972 :
5050 bruce 973 GIC 15 : Assert(blkno == metadata->head);
5129 tgl 974 15 : LockBuffer(metabuffer, GIN_UNLOCK);
975 :
976 : /*
2537 teodor 977 ECB : * if we removed the whole pending list or we cleanup tail (which
978 : * we remembered on start our cleanup process) then just exit
979 : */
2537 teodor 980 GIC 15 : if (blkno == InvalidBlockNumber || cleanupFinish)
981 : break;
982 :
5129 tgl 983 EUB : /*
984 : * release memory used so far and reinit state
985 : */
5129 tgl 986 UIC 0 : MemoryContextReset(opCtx);
4475 987 0 : initKeyArray(&datums, datums.maxvalues);
5129 988 0 : ginInitBA(&accum);
5129 tgl 989 ECB : }
990 : else
991 : {
5129 tgl 992 GIC 1405 : blkno = GinPageGetOpaque(page)->rightlink;
993 1405 : UnlockReleaseBuffer(buffer);
994 : }
995 :
5129 tgl 996 ECB : /*
997 : * Read next page in pending list
998 : */
2771 teodor 999 CBC 1405 : vacuum_delay_point();
5129 tgl 1000 GIC 1405 : buffer = ReadBuffer(index, blkno);
1001 1405 : LockBuffer(buffer, GIN_SHARE);
2545 kgrittn 1002 CBC 1405 : page = BufferGetPage(buffer);
5129 tgl 1003 ECB : }
1004 :
2537 teodor 1005 GIC 15 : UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
5129 tgl 1006 15 : ReleaseBuffer(metabuffer);
1007 :
1008 : /*
1009 : * As pending list pages can have a high churn rate, it is desirable to
1363 michael 1010 ECB : * recycle them immediately to the FreeSpaceMap when ordinary backends
2495 rhaas 1011 : * clean the list.
1012 : */
2755 teodor 1013 GIC 15 : if (fsm_vac && fill_fsm)
2771 teodor 1014 CBC 6 : IndexFreeSpaceMapVacuum(index);
2771 teodor 1015 ECB :
1016 : /* Clean up temporary space */
5129 tgl 1017 GIC 15 : MemoryContextSwitchTo(oldCtx);
1018 15 : MemoryContextDelete(opCtx);
1019 : }
1020 :
1021 : /*
2628 fujii 1022 ECB : * SQL-callable function to clean the insert pending list
1023 : */
1024 : Datum
2628 fujii 1025 CBC 9 : gin_clean_pending_list(PG_FUNCTION_ARGS)
1026 : {
2628 fujii 1027 GIC 9 : Oid indexoid = PG_GETARG_OID(0);
1466 tgl 1028 9 : Relation indexRel = index_open(indexoid, RowExclusiveLock);
2628 fujii 1029 ECB : IndexBulkDeleteResult stats;
2628 fujii 1030 EUB : GinState ginstate;
1031 :
2628 fujii 1032 GIC 9 : if (RecoveryInProgress())
2626 peter_e 1033 UIC 0 : ereport(ERROR,
1034 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1035 : errmsg("recovery is in progress"),
2118 tgl 1036 ECB : errhint("GIN pending list cannot be cleaned up during recovery.")));
2628 fujii 1037 :
2628 fujii 1038 EUB : /* Must be a GIN index */
2628 fujii 1039 GIC 9 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1040 9 : indexRel->rd_rel->relam != GIN_AM_OID)
2628 fujii 1041 UIC 0 : ereport(ERROR,
1042 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1043 : errmsg("\"%s\" is not a GIN index",
1044 : RelationGetRelationName(indexRel))));
1045 :
1046 : /*
1047 : * Reject attempts to read non-local temporary relations; we would be
2628 fujii 1048 ECB : * likely to get wrong data since we have no visibility into the owning
2628 fujii 1049 EUB : * session's local buffers.
1050 : */
2628 fujii 1051 GIC 9 : if (RELATION_IS_OTHER_TEMP(indexRel))
2628 fujii 1052 UIC 0 : ereport(ERROR,
1053 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2118 tgl 1054 ECB : errmsg("cannot access temporary indexes of other sessions")));
2628 fujii 1055 EUB :
1056 : /* User must own the index (comparable to privileges needed for VACUUM) */
147 peter 1057 GNC 9 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1954 peter_e 1058 LBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
2628 fujii 1059 0 : RelationGetRelationName(indexRel));
2628 fujii 1060 ECB :
2628 fujii 1061 GIC 9 : memset(&stats, 0, sizeof(stats));
2628 fujii 1062 CBC 9 : initGinState(&ginstate, indexRel);
1970 rhaas 1063 GIC 9 : ginInsertCleanup(&ginstate, true, true, true, &stats);
2628 fujii 1064 ECB :
1466 tgl 1065 GIC 9 : index_close(indexRel, RowExclusiveLock);
1066 :
2628 fujii 1067 9 : PG_RETURN_INT64((int64) stats.pages_deleted);
1068 : }
|