LCOV - differential code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Coverage Total Hit LBC UIC UBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 89.9 % 557 501 1 6 49 133 13 355 7 137 8
Current Date: 2023-04-08 15:15:32 Functions: 96.3 % 27 26 1 9 6 11 9
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*
       2                 :  * brin.c
       3                 :  *      Implementation of BRIN indexes for Postgres
       4                 :  *
       5                 :  * See src/backend/access/brin/README for details.
       6                 :  *
       7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       8                 :  * Portions Copyright (c) 1994, Regents of the University of California
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/access/brin/brin.c
      12                 :  *
      13                 :  * TODO
      14                 :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15                 :  */
      16                 : #include "postgres.h"
      17                 : 
      18                 : #include "access/brin.h"
      19                 : #include "access/brin_page.h"
      20                 : #include "access/brin_pageops.h"
      21                 : #include "access/brin_xlog.h"
      22                 : #include "access/relation.h"
      23                 : #include "access/reloptions.h"
      24                 : #include "access/relscan.h"
      25                 : #include "access/table.h"
      26                 : #include "access/tableam.h"
      27                 : #include "access/xloginsert.h"
      28                 : #include "catalog/index.h"
      29                 : #include "catalog/pg_am.h"
      30                 : #include "commands/vacuum.h"
      31                 : #include "miscadmin.h"
      32                 : #include "pgstat.h"
      33                 : #include "postmaster/autovacuum.h"
      34                 : #include "storage/bufmgr.h"
      35                 : #include "storage/freespace.h"
      36                 : #include "utils/acl.h"
      37                 : #include "utils/builtins.h"
      38                 : #include "utils/datum.h"
      39                 : #include "utils/guc.h"
      40                 : #include "utils/index_selfuncs.h"
      41                 : #include "utils/memutils.h"
      42                 : #include "utils/rel.h"
      43                 : 
      44                 : 
      45                 : /*
      46                 :  * We use a BrinBuildState during initial construction of a BRIN index.
      47                 :  * The running state is kept in a BrinMemTuple.
      48                 :  */
      49                 : typedef struct BrinBuildState
      50                 : {
      51                 :     Relation    bs_irel;
      52                 :     int         bs_numtuples;
      53                 :     Buffer      bs_currentInsertBuf;
      54                 :     BlockNumber bs_pagesPerRange;
      55                 :     BlockNumber bs_currRangeStart;
      56                 :     BrinRevmap *bs_rmAccess;
      57                 :     BrinDesc   *bs_bdesc;
      58                 :     BrinMemTuple *bs_dtuple;
      59                 : } BrinBuildState;
      60                 : 
      61                 : /*
      62                 :  * Struct used as "opaque" during index scans
      63                 :  */
      64                 : typedef struct BrinOpaque
      65                 : {
      66                 :     BlockNumber bo_pagesPerRange;
      67                 :     BrinRevmap *bo_rmAccess;
      68                 :     BrinDesc   *bo_bdesc;
      69                 : } BrinOpaque;
      70                 : 
      71                 : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      72                 : 
      73                 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      74                 :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
      75                 : static void terminate_brin_buildstate(BrinBuildState *state);
      76                 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      77                 :                           bool include_partial, double *numSummarized, double *numExisting);
      78                 : static void form_and_insert_tuple(BrinBuildState *state);
      79                 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      80                 :                          BrinTuple *b);
      81                 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      82                 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
      83                 :                                 BrinMemTuple *dtup, Datum *values, bool *nulls);
      84                 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
      85                 : 
      86                 : /*
      87                 :  * BRIN handler function: return IndexAmRoutine with access method parameters
      88                 :  * and callbacks.
      89                 :  */
      90                 : Datum
      91 GIC         901 : brinhandler(PG_FUNCTION_ARGS)
      92 ECB             : {
      93 GIC         901 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      94 ECB             : 
      95 GIC         901 :     amroutine->amstrategies = 0;
      96 CBC         901 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      97             901 :     amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
      98             901 :     amroutine->amcanorder = false;
      99             901 :     amroutine->amcanorderbyop = false;
     100             901 :     amroutine->amcanbackward = false;
     101             901 :     amroutine->amcanunique = false;
     102             901 :     amroutine->amcanmulticol = true;
     103             901 :     amroutine->amoptionalkey = true;
     104             901 :     amroutine->amsearcharray = false;
     105             901 :     amroutine->amsearchnulls = true;
     106             901 :     amroutine->amstorage = true;
     107             901 :     amroutine->amclusterable = false;
     108             901 :     amroutine->ampredlocks = false;
     109             901 :     amroutine->amcanparallel = false;
     110             901 :     amroutine->amcaninclude = false;
     111             901 :     amroutine->amusemaintenanceworkmem = false;
     112 GNC         901 :     amroutine->amsummarizing = true;
     113 CBC         901 :     amroutine->amparallelvacuumoptions =
     114 ECB             :         VACUUM_OPTION_PARALLEL_CLEANUP;
     115 CBC         901 :     amroutine->amkeytype = InvalidOid;
     116                 : 
     117             901 :     amroutine->ambuild = brinbuild;
     118 GIC         901 :     amroutine->ambuildempty = brinbuildempty;
     119 CBC         901 :     amroutine->aminsert = brininsert;
     120             901 :     amroutine->ambulkdelete = brinbulkdelete;
     121             901 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     122             901 :     amroutine->amcanreturn = NULL;
     123             901 :     amroutine->amcostestimate = brincostestimate;
     124             901 :     amroutine->amoptions = brinoptions;
     125             901 :     amroutine->amproperty = NULL;
     126             901 :     amroutine->ambuildphasename = NULL;
     127             901 :     amroutine->amvalidate = brinvalidate;
     128             901 :     amroutine->amadjustmembers = NULL;
     129             901 :     amroutine->ambeginscan = brinbeginscan;
     130             901 :     amroutine->amrescan = brinrescan;
     131             901 :     amroutine->amgettuple = NULL;
     132             901 :     amroutine->amgetbitmap = bringetbitmap;
     133             901 :     amroutine->amendscan = brinendscan;
     134             901 :     amroutine->ammarkpos = NULL;
     135             901 :     amroutine->amrestrpos = NULL;
     136             901 :     amroutine->amestimateparallelscan = NULL;
     137             901 :     amroutine->aminitparallelscan = NULL;
     138             901 :     amroutine->amparallelrescan = NULL;
     139 ECB             : 
     140 CBC         901 :     PG_RETURN_POINTER(amroutine);
     141                 : }
     142 ECB             : 
     143                 : /*
     144                 :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     145                 :  * we need to obtain the relevant index tuple and compare its stored values
     146                 :  * with those of the new tuple.  If the tuple values are not consistent with
     147                 :  * the summary tuple, we need to update the index tuple.
     148                 :  *
     149                 :  * If autosummarization is enabled, check if we need to summarize the previous
     150                 :  * page range.
     151                 :  *
     152                 :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     153                 :  * it), there's nothing to do for this tuple.
     154                 :  */
     155                 : bool
     156 GIC       38960 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     157                 :            ItemPointer heaptid, Relation heapRel,
     158 ECB             :            IndexUniqueCheck checkUnique,
     159                 :            bool indexUnchanged,
     160                 :            IndexInfo *indexInfo)
     161                 : {
     162                 :     BlockNumber pagesPerRange;
     163                 :     BlockNumber origHeapBlk;
     164                 :     BlockNumber heapBlk;
     165 GIC       38960 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     166                 :     BrinRevmap *revmap;
     167 CBC       38960 :     Buffer      buf = InvalidBuffer;
     168 GIC       38960 :     MemoryContext tupcxt = NULL;
     169 CBC       38960 :     MemoryContext oldcxt = CurrentMemoryContext;
     170           38960 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     171 ECB             : 
     172 CBC       38960 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     173                 : 
     174 ECB             :     /*
     175                 :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     176                 :      * is the first block in the corresponding page range.
     177                 :      */
     178 GIC       38960 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     179           38960 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     180 ECB             : 
     181                 :     for (;;)
     182 UIC           0 :     {
     183 GIC       38960 :         bool        need_insert = false;
     184 EUB             :         OffsetNumber off;
     185 ECB             :         BrinTuple  *brtup;
     186                 :         BrinMemTuple *dtup;
     187                 : 
     188 GIC       38960 :         CHECK_FOR_INTERRUPTS();
     189                 : 
     190 ECB             :         /*
     191                 :          * If auto-summarization is enabled and we just inserted the first
     192                 :          * tuple into the first block of a new non-first page range, request a
     193                 :          * summarization run of the previous range.
     194                 :          */
     195 GIC       38960 :         if (autosummarize &&
     196              78 :             heapBlk > 0 &&
     197 CBC          78 :             heapBlk == origHeapBlk &&
     198              78 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     199 ECB             :         {
     200 CBC           4 :             BlockNumber lastPageRange = heapBlk - 1;
     201                 :             BrinTuple  *lastPageTuple;
     202 ECB             : 
     203                 :             lastPageTuple =
     204 GIC           4 :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     205                 :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     206 CBC           4 :             if (!lastPageTuple)
     207                 :             {
     208 ECB             :                 bool        recorded;
     209                 : 
     210 GIC           3 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     211                 :                                                  RelationGetRelid(idxRel),
     212 ECB             :                                                  lastPageRange);
     213 GIC           3 :                 if (!recorded)
     214 UIC           0 :                     ereport(LOG,
     215 ECB             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     216 EUB             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
     217                 :                                     RelationGetRelationName(idxRel),
     218                 :                                     lastPageRange)));
     219                 :             }
     220                 :             else
     221 GIC           1 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     222                 :         }
     223 ECB             : 
     224 GIC       38960 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     225                 :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     226 ECB             : 
     227                 :         /* if range is unsummarized, there's nothing to do */
     228 GIC       38960 :         if (!brtup)
     229           30870 :             break;
     230 ECB             : 
     231                 :         /* First time through in this statement? */
     232 GIC        8090 :         if (bdesc == NULL)
     233                 :         {
     234 CBC         509 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     235 GIC         509 :             bdesc = brin_build_desc(idxRel);
     236 CBC         509 :             indexInfo->ii_AmCache = (void *) bdesc;
     237             509 :             MemoryContextSwitchTo(oldcxt);
     238 ECB             :         }
     239                 :         /* First time through in this brininsert call? */
     240 GIC        8090 :         if (tupcxt == NULL)
     241                 :         {
     242 CBC        8090 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     243                 :                                            "brininsert cxt",
     244 ECB             :                                            ALLOCSET_DEFAULT_SIZES);
     245 GIC        8090 :             MemoryContextSwitchTo(tupcxt);
     246                 :         }
     247 ECB             : 
     248 GIC        8090 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     249                 : 
     250 CBC        8090 :         need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
     251                 : 
     252            8090 :         if (!need_insert)
     253                 :         {
     254 ECB             :             /*
     255                 :              * The tuple is consistent with the new values, so there's nothing
     256                 :              * to do.
     257                 :              */
     258 GIC        6412 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     259                 :         }
     260 ECB             :         else
     261                 :         {
     262 GIC        1678 :             Page        page = BufferGetPage(buf);
     263            1678 :             ItemId      lp = PageGetItemId(page, off);
     264 ECB             :             Size        origsz;
     265                 :             BrinTuple  *origtup;
     266                 :             Size        newsz;
     267                 :             BrinTuple  *newtup;
     268                 :             bool        samepage;
     269                 : 
     270                 :             /*
     271                 :              * Make a copy of the old tuple, so that we can compare it after
     272                 :              * re-acquiring the lock.
     273                 :              */
     274 GIC        1678 :             origsz = ItemIdGetLength(lp);
     275            1678 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     276 ECB             : 
     277                 :             /*
     278                 :              * Before releasing the lock, check if we can attempt a same-page
     279                 :              * update.  Another process could insert a tuple concurrently in
     280                 :              * the same page though, so downstream we must be prepared to cope
     281                 :              * if this turns out to not be possible after all.
     282                 :              */
     283 GIC        1678 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     284            1678 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     285 CBC        1678 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     286 ECB             : 
     287                 :             /*
     288                 :              * Try to update the tuple.  If this doesn't work for whatever
     289                 :              * reason, we need to restart from the top; the revmap might be
     290                 :              * pointing at a different tuple for this block now, so we need to
     291                 :              * recompute to ensure both our new heap tuple and the other
     292                 :              * inserter's are covered by the combined tuple.  It might be that
     293                 :              * we don't need to update at all.
     294                 :              */
     295 GIC        1678 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     296                 :                                buf, off, origtup, origsz, newtup, newsz,
     297 ECB             :                                samepage))
     298                 :             {
     299                 :                 /* no luck; start over */
     300 UIC           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     301               0 :                 continue;
     302 EUB             :             }
     303                 :         }
     304                 : 
     305                 :         /* success! */
     306 GIC        8090 :         break;
     307                 :     }
     308 ECB             : 
     309 GIC       38960 :     brinRevmapTerminate(revmap);
     310           38960 :     if (BufferIsValid(buf))
     311 CBC        8091 :         ReleaseBuffer(buf);
     312           38960 :     MemoryContextSwitchTo(oldcxt);
     313           38960 :     if (tupcxt != NULL)
     314            8090 :         MemoryContextDelete(tupcxt);
     315 ECB             : 
     316 CBC       38960 :     return false;
     317                 : }
     318 ECB             : 
     319                 : /*
     320                 :  * Initialize state for a BRIN index scan.
     321                 :  *
     322                 :  * We read the metapage here to determine the pages-per-range number that this
     323                 :  * index was built with.  Note that since this cannot be changed while we're
     324                 :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     325                 :  */
     326                 : IndexScanDesc
     327 GIC        1290 : brinbeginscan(Relation r, int nkeys, int norderbys)
     328                 : {
     329 ECB             :     IndexScanDesc scan;
     330                 :     BrinOpaque *opaque;
     331                 : 
     332 GIC        1290 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     333                 : 
     334 GNC        1290 :     opaque = palloc_object(BrinOpaque);
     335 GIC        1290 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     336 ECB             :                                                scan->xs_snapshot);
     337 CBC        1290 :     opaque->bo_bdesc = brin_build_desc(r);
     338 GIC        1290 :     scan->opaque = opaque;
     339 ECB             : 
     340 CBC        1290 :     return scan;
     341                 : }
     342 ECB             : 
     343                 : /*
     344                 :  * Execute the index scan.
     345                 :  *
     346                 :  * This works by reading index TIDs from the revmap, and obtaining the index
     347                 :  * tuples pointed to by them; the summary values in the index tuples are
     348                 :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     349                 :  * ranges corresponding to index tuples that match the scan keys.
     350                 :  *
     351                 :  * If a TID from the revmap is read as InvalidTID, we know that range is
     352                 :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     353                 :  * keys.
     354                 :  */
     355                 : int64
     356 GIC        1290 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     357                 : {
     358 CBC        1290 :     Relation    idxRel = scan->indexRelation;
     359 GIC        1290 :     Buffer      buf = InvalidBuffer;
     360 ECB             :     BrinDesc   *bdesc;
     361                 :     Oid         heapOid;
     362                 :     Relation    heapRel;
     363                 :     BrinOpaque *opaque;
     364                 :     BlockNumber nblocks;
     365                 :     BlockNumber heapBlk;
     366 GIC        1290 :     int         totalpages = 0;
     367                 :     FmgrInfo   *consistentFn;
     368 ECB             :     MemoryContext oldcxt;
     369                 :     MemoryContext perRangeCxt;
     370                 :     BrinMemTuple *dtup;
     371 GIC        1290 :     BrinTuple  *btup = NULL;
     372            1290 :     Size        btupsz = 0;
     373 ECB             :     ScanKey   **keys,
     374                 :               **nullkeys;
     375                 :     int        *nkeys,
     376                 :                *nnullkeys;
     377                 :     char       *ptr;
     378                 :     Size        len;
     379                 :     char       *tmp PG_USED_FOR_ASSERTS_ONLY;
     380                 : 
     381 GIC        1290 :     opaque = (BrinOpaque *) scan->opaque;
     382 CBC        1290 :     bdesc = opaque->bo_bdesc;
     383            1290 :     pgstat_count_index_scan(idxRel);
     384 ECB             : 
     385                 :     /*
     386                 :      * We need to know the size of the table so that we know how long to
     387                 :      * iterate on the revmap.
     388                 :      */
     389 GIC        1290 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     390 CBC        1290 :     heapRel = table_open(heapOid, AccessShareLock);
     391            1290 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     392            1290 :     table_close(heapRel, AccessShareLock);
     393 ECB             : 
     394                 :     /*
     395                 :      * Make room for the consistent support procedures of indexed columns.  We
     396                 :      * don't look them up here; we do that lazily the first time we see a scan
     397                 :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     398                 :      */
     399 GNC        1290 :     consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
     400 ECB             : 
     401                 :     /*
     402                 :      * Make room for per-attribute lists of scan keys that we'll pass to the
     403                 :      * consistent support procedure. We don't know which attributes have scan
     404                 :      * keys, so we allocate space for all attributes. That may use more memory
     405                 :      * but it's probably cheaper than determining which attributes are used.
     406                 :      *
     407                 :      * We keep null and regular keys separate, so that we can pass just the
     408                 :      * regular keys to the consistent function easily.
     409                 :      *
     410                 :      * To reduce the allocation overhead, we allocate one big chunk and then
     411                 :      * carve it into smaller arrays ourselves. All the pieces have exactly the
     412                 :      * same lifetime, so that's OK.
     413                 :      *
     414                 :      * XXX The widest index can have 32 attributes, so the amount of wasted
     415                 :      * memory is negligible. We could invent a more compact approach (with
     416                 :      * just space for used attributes) but that would make the matching more
     417                 :      * complex so it's not a good trade-off.
     418                 :      */
     419 GIC        1290 :     len =
     420 CBC        1290 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* regular keys */
     421            1290 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
     422            1290 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
     423            1290 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* NULL keys */
     424            1290 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
     425            1290 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     426 ECB             : 
     427 GIC        1290 :     ptr = palloc(len);
     428 CBC        1290 :     tmp = ptr;
     429 ECB             : 
     430 GIC        1290 :     keys = (ScanKey **) ptr;
     431 CBC        1290 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
     432 ECB             : 
     433 GIC        1290 :     nullkeys = (ScanKey **) ptr;
     434 CBC        1290 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
     435 ECB             : 
     436 GIC        1290 :     nkeys = (int *) ptr;
     437 CBC        1290 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     438 ECB             : 
     439 GIC        1290 :     nnullkeys = (int *) ptr;
     440 CBC        1290 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     441 ECB             : 
     442 GIC       34623 :     for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
     443 ECB             :     {
     444 GIC       33333 :         keys[i] = (ScanKey *) ptr;
     445 CBC       33333 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
     446 ECB             : 
     447 GIC       33333 :         nullkeys[i] = (ScanKey *) ptr;
     448 CBC       33333 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
     449 ECB             :     }
     450                 : 
     451 GIC        1290 :     Assert(tmp + len == ptr);
     452 ECB             : 
     453                 :     /* zero the number of keys */
     454 GIC        1290 :     memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
     455 CBC        1290 :     memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
     456 ECB             : 
     457                 :     /* Preprocess the scan keys - split them into per-attribute arrays. */
     458 GNC        2580 :     for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
     459 ECB             :     {
     460 GIC        1290 :         ScanKey     key = &scan->keyData[keyno];
     461 CBC        1290 :         AttrNumber  keyattno = key->sk_attno;
     462 ECB             : 
     463                 :         /*
     464                 :          * The collation of the scan key must match the collation used in the
     465                 :          * index column (but only if the search is not IS NULL/ IS NOT NULL).
     466                 :          * Otherwise we shouldn't be using this index ...
     467                 :          */
     468 GIC        1290 :         Assert((key->sk_flags & SK_ISNULL) ||
     469 ECB             :                (key->sk_collation ==
     470                 :                 TupleDescAttr(bdesc->bd_tupdesc,
     471                 :                               keyattno - 1)->attcollation));
     472                 : 
     473                 :         /*
     474                 :          * First time we see this index attribute, so init as needed.
     475                 :          *
     476                 :          * This is a bit of an overkill - we don't know how many scan keys are
     477                 :          * there for this attribute, so we simply allocate the largest number
     478                 :          * possible (as if all keys were for this attribute). This may waste a
     479                 :          * bit of memory, but we only expect small number of scan keys in
     480                 :          * general, so this should be negligible, and repeated repalloc calls
     481                 :          * are not free either.
     482                 :          */
     483 GIC        1290 :         if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     484 ECB             :         {
     485                 :             FmgrInfo   *tmp;
     486                 : 
     487                 :             /* First time we see this attribute, so no key/null keys. */
     488 GIC        1290 :             Assert(nkeys[keyattno - 1] == 0);
     489 CBC        1290 :             Assert(nnullkeys[keyattno - 1] == 0);
     490 ECB             : 
     491 GIC        1290 :             tmp = index_getprocinfo(idxRel, keyattno,
     492 ECB             :                                     BRIN_PROCNUM_CONSISTENT);
     493 GIC        1290 :             fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     494 ECB             :                            CurrentMemoryContext);
     495                 :         }
     496                 : 
     497                 :         /* Add key to the proper per-attribute array. */
     498 GIC        1290 :         if (key->sk_flags & SK_ISNULL)
     499 ECB             :         {
     500 GIC          18 :             nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
     501 CBC          18 :             nnullkeys[keyattno - 1]++;
     502 ECB             :         }
     503                 :         else
     504                 :         {
     505 GIC        1272 :             keys[keyattno - 1][nkeys[keyattno - 1]] = key;
     506 CBC        1272 :             nkeys[keyattno - 1]++;
     507 ECB             :         }
     508                 :     }
     509                 : 
     510                 :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     511 GIC        1290 :     dtup = brin_new_memtuple(bdesc);
     512 ECB             : 
     513                 :     /*
     514                 :      * Setup and use a per-range memory context, which is reset every time we
     515                 :      * loop below.  This avoids having to free the tuples within the loop.
     516                 :      */
     517 GIC        1290 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     518 ECB             :                                         "bringetbitmap cxt",
     519                 :                                         ALLOCSET_DEFAULT_SIZES);
     520 GIC        1290 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     521 ECB             : 
     522                 :     /*
     523                 :      * Now scan the revmap.  We start by querying for heap page 0,
     524                 :      * incrementing by the number of pages per range; this gives us a full
     525                 :      * view of the table.
     526                 :      */
     527 GIC       95217 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     528 ECB             :     {
     529                 :         bool        addrange;
     530 GIC       93927 :         bool        gottuple = false;
     531 ECB             :         BrinTuple  *tup;
     532                 :         OffsetNumber off;
     533                 :         Size        size;
     534                 : 
     535 GIC       93927 :         CHECK_FOR_INTERRUPTS();
     536 ECB             : 
     537 GIC       93927 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     538 ECB             : 
     539 GIC       93927 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     540 ECB             :                                        &off, &size, BUFFER_LOCK_SHARE,
     541                 :                                        scan->xs_snapshot);
     542 GIC       93927 :         if (tup)
     543 ECB             :         {
     544 GIC       93927 :             gottuple = true;
     545 CBC       93927 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     546           93927 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     547 ECB             :         }
     548                 : 
     549                 :         /*
     550                 :          * For page ranges with no indexed tuple, we must return the whole
     551                 :          * range; otherwise, compare it to the scan keys.
     552                 :          */
     553 GIC       93927 :         if (!gottuple)
     554 ECB             :         {
     555 UIC           0 :             addrange = true;
     556 EUB             :         }
     557                 :         else
     558                 :         {
     559 GIC       93927 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     560 CBC       93927 :             if (dtup->bt_placeholder)
     561 ECB             :             {
     562                 :                 /*
     563                 :                  * Placeholder tuples are always returned, regardless of the
     564                 :                  * values stored in them.
     565                 :                  */
     566 UIC           0 :                 addrange = true;
     567 EUB             :             }
     568                 :             else
     569                 :             {
     570                 :                 int         attno;
     571                 : 
     572                 :                 /*
     573                 :                  * Compare scan keys with summary values stored for the range.
     574                 :                  * If scan keys are matched, the page range must be added to
     575                 :                  * the bitmap.  We initially assume the range needs to be
     576                 :                  * added; in particular this serves the case where there are
     577                 :                  * no keys.
     578                 :                  */
     579 GIC       93927 :                 addrange = true;
     580 CBC     2350500 :                 for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
     581 ECB             :                 {
     582                 :                     BrinValues *bval;
     583                 :                     Datum       add;
     584                 :                     Oid         collation;
     585                 : 
     586                 :                     /*
     587                 :                      * skip attributes without any scan keys (both regular and
     588                 :                      * IS [NOT] NULL)
     589                 :                      */
     590 GIC     2282826 :                     if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
     591 CBC     2188899 :                         continue;
     592 ECB             : 
     593 GIC       93927 :                     bval = &dtup->bt_columns[attno - 1];
     594 ECB             : 
     595                 :                     /*
     596                 :                      * First check if there are any IS [NOT] NULL scan keys,
     597                 :                      * and if we're violating them. In that case we can
     598                 :                      * terminate early, without invoking the support function.
     599                 :                      *
     600                 :                      * As there may be more keys, we can only determine
     601                 :                      * mismatch within this loop.
     602                 :                      */
     603 GIC       93927 :                     if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
     604 CBC       93927 :                         !check_null_keys(bval, nullkeys[attno - 1],
     605           93927 :                                          nnullkeys[attno - 1]))
     606 ECB             :                     {
     607                 :                         /*
     608                 :                          * If any of the IS [NOT] NULL keys failed, the page
     609                 :                          * range as a whole can't pass. So terminate the loop.
     610                 :                          */
     611 GIC         498 :                         addrange = false;
     612 CBC         498 :                         break;
     613 ECB             :                     }
     614                 : 
     615                 :                     /*
     616                 :                      * So either there are no IS [NOT] NULL keys, or all
     617                 :                      * passed. If there are no regular scan keys, we're done -
     618                 :                      * the page range matches. If there are regular keys, but
     619                 :                      * the page range is marked as 'all nulls' it can't
     620                 :                      * possibly pass (we're assuming the operators are
     621                 :                      * strict).
     622                 :                      */
     623                 : 
     624                 :                     /* No regular scan keys - page range as a whole passes. */
     625 GIC       93429 :                     if (!nkeys[attno - 1])
     626 CBC         618 :                         continue;
     627 ECB             : 
     628 GIC       92811 :                     Assert((nkeys[attno - 1] > 0) &&
     629 ECB             :                            (nkeys[attno - 1] <= scan->numberOfKeys));
     630                 : 
     631                 :                     /* If it is all nulls, it cannot possibly be consistent. */
     632 GIC       92811 :                     if (bval->bv_allnulls)
     633 ECB             :                     {
     634 GIC         189 :                         addrange = false;
     635 CBC         189 :                         break;
     636 ECB             :                     }
     637                 : 
     638                 :                     /*
     639                 :                      * Collation from the first key (has to be the same for
     640                 :                      * all keys for the same attribute).
     641                 :                      */
     642 GIC       92622 :                     collation = keys[attno - 1][0]->sk_collation;
     643 ECB             : 
     644                 :                     /*
     645                 :                      * Check whether the scan key is consistent with the page
     646                 :                      * range values; if so, have the pages in the range added
     647                 :                      * to the output bitmap.
     648                 :                      *
     649                 :                      * The opclass may or may not support processing of
     650                 :                      * multiple scan keys. We can determine that based on the
     651                 :                      * number of arguments - functions with extra parameter
     652                 :                      * (number of scan keys) do support this, otherwise we
     653                 :                      * have to simply pass the scan keys one by one.
     654                 :                      */
     655 GIC       92622 :                     if (consistentFn[attno - 1].fn_nargs >= 4)
     656 ECB             :                     {
     657                 :                         /* Check all keys at once */
     658 GIC       18756 :                         add = FunctionCall4Coll(&consistentFn[attno - 1],
     659 ECB             :                                                 collation,
     660                 :                                                 PointerGetDatum(bdesc),
     661                 :                                                 PointerGetDatum(bval),
     662 GIC       18756 :                                                 PointerGetDatum(keys[attno - 1]),
     663 CBC       18756 :                                                 Int32GetDatum(nkeys[attno - 1]));
     664           18756 :                         addrange = DatumGetBool(add);
     665 ECB             :                     }
     666                 :                     else
     667                 :                     {
     668                 :                         /*
     669                 :                          * Check keys one by one
     670                 :                          *
     671                 :                          * When there are multiple scan keys, failure to meet
     672                 :                          * the criteria for a single one of them is enough to
     673                 :                          * discard the range as a whole, so break out of the
     674                 :                          * loop as soon as a false return value is obtained.
     675                 :                          */
     676                 :                         int         keyno;
     677                 : 
     678 GIC      129039 :                         for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
     679 ECB             :                         {
     680 GIC       73866 :                             add = FunctionCall3Coll(&consistentFn[attno - 1],
     681 CBC       73866 :                                                     keys[attno - 1][keyno]->sk_collation,
     682 ECB             :                                                     PointerGetDatum(bdesc),
     683                 :                                                     PointerGetDatum(bval),
     684 GIC       73866 :                                                     PointerGetDatum(keys[attno - 1][keyno]));
     685 CBC       73866 :                             addrange = DatumGetBool(add);
     686           73866 :                             if (!addrange)
     687           18693 :                                 break;
     688 ECB             :                         }
     689                 :                     }
     690                 : 
     691                 :                     /*
     692                 :                      * If we found a scan key eliminating the range, no need to
     693                 :                      * check additional ones.
     694                 :                      */
     695 GIC       92622 :                     if (!addrange)
     696 CBC       25566 :                         break;
     697 ECB             :                 }
     698                 :             }
     699                 :         }
     700                 : 
     701                 :         /* add the pages in the range to the output bitmap, if needed */
     702 GIC       93927 :         if (addrange)
     703 ECB             :         {
     704                 :             BlockNumber pageno;
     705                 : 
     706 GIC       67674 :             for (pageno = heapBlk;
     707 CBC      135348 :                  pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
     708           67674 :                  pageno++)
     709 ECB             :             {
     710 GIC       67674 :                 MemoryContextSwitchTo(oldcxt);
     711 CBC       67674 :                 tbm_add_page(tbm, pageno);
     712           67674 :                 totalpages++;
     713           67674 :                 MemoryContextSwitchTo(perRangeCxt);
     714 ECB             :             }
     715                 :         }
     716                 :     }
     717                 : 
     718 GIC        1290 :     MemoryContextSwitchTo(oldcxt);
     719 CBC        1290 :     MemoryContextDelete(perRangeCxt);
     720 ECB             : 
     721 GIC        1290 :     if (buf != InvalidBuffer)
     722 CBC        1290 :         ReleaseBuffer(buf);
     723 ECB             : 
     724                 :     /*
     725                 :      * XXX We have an approximation of the number of *pages* that our scan
     726                 :      * returns, but we don't have a precise idea of the number of heap tuples
     727                 :      * involved.
     728                 :      */
     729 GIC        1290 :     return totalpages * 10;
     730 ECB             : }
     731                 : 
     732                 : /*
     733                 :  * Re-initialize state for a BRIN index scan
     734                 :  */
     735                 : void
     736 GIC        1290 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     737 ECB             :            ScanKey orderbys, int norderbys)
     738                 : {
     739                 :     /*
     740                 :      * Other index AMs preprocess the scan keys at this point, or sometime
     741                 :      * early during the scan; this lets them optimize by removing redundant
     742                 :      * keys, or doing early returns when they are impossible to satisfy; see
     743                 :      * _bt_preprocess_keys for an example.  Something like that could be added
     744                 :      * here someday, too.
     745                 :      */
     746                 : 
     747 GIC        1290 :     if (scankey && scan->numberOfKeys > 0)
     748 CBC        1290 :         memmove(scan->keyData, scankey,
     749            1290 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     750            1290 : }
     751 ECB             : 
     752                 : /*
     753                 :  * Close down a BRIN index scan
     754                 :  */
     755                 : void
     756 GIC        1290 : brinendscan(IndexScanDesc scan)
     757 ECB             : {
     758 GIC        1290 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     759 ECB             : 
     760 GIC        1290 :     brinRevmapTerminate(opaque->bo_rmAccess);
     761 CBC        1290 :     brin_free_desc(opaque->bo_bdesc);
     762            1290 :     pfree(opaque);
     763            1290 : }
     764 ECB             : 
     765                 : /*
     766                 :  * Per-heap-tuple callback for table_index_build_scan.
     767                 :  *
     768                 :  * Note we don't worry about the page range at the end of the table here; it is
     769                 :  * present in the build state struct after we're called the last time, but not
     770                 :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     771                 :  */
     772                 : static void
     773 GIC      346317 : brinbuildCallback(Relation index,
     774 ECB             :                   ItemPointer tid,
     775                 :                   Datum *values,
     776                 :                   bool *isnull,
     777                 :                   bool tupleIsAlive,
     778                 :                   void *brstate)
     779                 : {
     780 GIC      346317 :     BrinBuildState *state = (BrinBuildState *) brstate;
     781 ECB             :     BlockNumber thisblock;
     782                 : 
     783 GIC      346317 :     thisblock = ItemPointerGetBlockNumber(tid);
     784 ECB             : 
     785                 :     /*
     786                 :      * If we're in a block that belongs to a future range, summarize what
     787                 :      * we've got and start afresh.  Note the scan might have skipped many
     788                 :      * pages, if they were devoid of live tuples; make sure to insert index
     789                 :      * tuples for those too.
     790                 :      */
     791 GIC      347322 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     792 ECB             :     {
     793                 : 
     794                 :         BRIN_elog((DEBUG2,
     795                 :                    "brinbuildCallback: completed a range: %u--%u",
     796                 :                    state->bs_currRangeStart,
     797                 :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     798                 : 
     799                 :         /* create the index tuple and insert it */
     800 GIC        1005 :         form_and_insert_tuple(state);
     801 ECB             : 
     802                 :         /* set state to correspond to the next range */
     803 GIC        1005 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     804 ECB             : 
     805                 :         /* re-initialize state for it */
     806 GIC        1005 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     807 ECB             :     }
     808                 : 
     809                 :     /* Accumulate the current tuple into the running state */
     810 GIC      346317 :     (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
     811 ECB             :                                values, isnull);
     812 GIC      346317 : }
     813 ECB             : 
     814                 : /*
     815                 :  * brinbuild() -- build a new BRIN index.
     816                 :  */
     817                 : IndexBuildResult *
     818 GIC         119 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     819 ECB             : {
     820                 :     IndexBuildResult *result;
     821                 :     double      reltuples;
     822                 :     double      idxtuples;
     823                 :     BrinRevmap *revmap;
     824                 :     BrinBuildState *state;
     825                 :     Buffer      meta;
     826                 :     BlockNumber pagesPerRange;
     827                 : 
     828                 :     /*
     829                 :      * We expect to be called exactly once for any index relation.
     830                 :      */
     831 GIC         119 :     if (RelationGetNumberOfBlocks(index) != 0)
     832 LBC           0 :         elog(ERROR, "index \"%s\" already contains data",
     833 EUB             :              RelationGetRelationName(index));
     834                 : 
     835                 :     /*
     836                 :      * Critical section not required, because on error the creation of the
     837                 :      * whole relation will be rolled back.
     838                 :      */
     839                 : 
     840 GNC         119 :     meta = ExtendBufferedRel(EB_REL(index), MAIN_FORKNUM, NULL,
     841                 :                              EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
     842 CBC         119 :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     843 ECB             : 
     844 GIC         119 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     845 ECB             :                        BRIN_CURRENT_VERSION);
     846 GIC         119 :     MarkBufferDirty(meta);
     847 ECB             : 
     848 GIC         119 :     if (RelationNeedsWAL(index))
     849 ECB             :     {
     850                 :         xl_brin_createidx xlrec;
     851                 :         XLogRecPtr  recptr;
     852                 :         Page        page;
     853                 : 
     854 GIC          51 :         xlrec.version = BRIN_CURRENT_VERSION;
     855 CBC          51 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     856 ECB             : 
     857 GIC          51 :         XLogBeginInsert();
     858 CBC          51 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     859              51 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
     860 ECB             : 
     861 GIC          51 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     862 ECB             : 
     863 GIC          51 :         page = BufferGetPage(meta);
     864 CBC          51 :         PageSetLSN(page, recptr);
     865 ECB             :     }
     866                 : 
     867 GIC         119 :     UnlockReleaseBuffer(meta);
     868 ECB             : 
     869                 :     /*
     870                 :      * Initialize our state, including the deformed tuple state.
     871                 :      */
     872 GIC         119 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     873 CBC         119 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     874 ECB             : 
     875                 :     /*
     876                 :      * Now scan the relation.  No syncscan allowed here because we want the
     877                 :      * heap blocks in physical order.
     878                 :      */
     879 GIC         119 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
     880 ECB             :                                        brinbuildCallback, (void *) state, NULL);
     881                 : 
     882                 :     /* process the final batch */
     883 GIC         119 :     form_and_insert_tuple(state);
     884 ECB             : 
     885                 :     /* release resources */
     886 GIC         119 :     idxtuples = state->bs_numtuples;
     887 CBC         119 :     brinRevmapTerminate(state->bs_rmAccess);
     888             119 :     terminate_brin_buildstate(state);
     889 ECB             : 
     890                 :     /*
     891                 :      * Return statistics
     892                 :      */
     893 GNC         119 :     result = palloc_object(IndexBuildResult);
     894 ECB             : 
     895 GIC         119 :     result->heap_tuples = reltuples;
     896 CBC         119 :     result->index_tuples = idxtuples;
     897 ECB             : 
     898 GIC         119 :     return result;
     899 ECB             : }
     900                 : 
     901                 : void
     902 GIC           3 : brinbuildempty(Relation index)
     903 ECB             : {
     904                 :     Buffer      metabuf;
     905                 : 
     906                 :     /* An empty BRIN index has a metapage only. */
     907 GNC           3 :     metabuf = ExtendBufferedRel(EB_REL(index), INIT_FORKNUM, NULL,
     908                 :                                 EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
     909                 : 
     910                 :     /* Initialize and xlog metabuffer. */
     911 CBC           3 :     START_CRIT_SECTION();
     912               3 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     913                 :                        BRIN_CURRENT_VERSION);
     914               3 :     MarkBufferDirty(metabuf);
     915               3 :     log_newpage_buffer(metabuf, true);
     916               3 :     END_CRIT_SECTION();
     917                 : 
     918               3 :     UnlockReleaseBuffer(metabuf);
     919               3 : }
     920                 : 
     921                 : /*
     922                 :  * brinbulkdelete
     923                 :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     924                 :  *      there's not a lot we can do here.
     925                 :  *
     926                 :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     927                 :  * tuple is deleted), meaning the need to re-run summarization on the affected
     928                 :  * range.  Would need to add an extra flag in brintuples for that.
     929                 :  */
     930                 : IndexBulkDeleteResult *
     931               8 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     932                 :                IndexBulkDeleteCallback callback, void *callback_state)
     933                 : {
     934                 :     /* allocate stats if first time through, else re-use existing struct */
     935               8 :     if (stats == NULL)
     936 GNC           8 :         stats = palloc0_object(IndexBulkDeleteResult);
     937                 : 
     938 CBC           8 :     return stats;
     939                 : }
     940                 : 
     941                 : /*
     942                 :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     943                 :  * ranges that are currently unsummarized.
     944                 :  */
     945                 : IndexBulkDeleteResult *
     946              43 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     947                 : {
     948                 :     Relation    heapRel;
     949                 : 
     950                 :     /* No-op in ANALYZE ONLY mode */
     951              43 :     if (info->analyze_only)
     952               1 :         return stats;
     953                 : 
     954              42 :     if (!stats)
     955 GNC          34 :         stats = palloc0_object(IndexBulkDeleteResult);
     956 CBC          42 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     957                 :     /* rest of stats is initialized by zeroing */
     958                 : 
     959              42 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
     960                 :                          AccessShareLock);
     961                 : 
     962              42 :     brin_vacuum_scan(info->index, info->strategy);
     963                 : 
     964              42 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
     965                 :                   &stats->num_index_tuples, &stats->num_index_tuples);
     966                 : 
     967              42 :     table_close(heapRel, AccessShareLock);
     968                 : 
     969              42 :     return stats;
     970                 : }
     971                 : 
     972                 : /*
     973                 :  * reloptions processor for BRIN indexes
     974                 :  */
     975                 : bytea *
     976             264 : brinoptions(Datum reloptions, bool validate)
     977                 : {
     978                 :     static const relopt_parse_elt tab[] = {
     979                 :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     980                 :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     981                 :     };
     982                 : 
     983             264 :     return (bytea *) build_reloptions(reloptions, validate,
     984                 :                                       RELOPT_KIND_BRIN,
     985                 :                                       sizeof(BrinOptions),
     986                 :                                       tab, lengthof(tab));
     987                 : }
     988                 : 
     989                 : /*
     990                 :  * SQL-callable function to scan through an index and summarize all ranges
     991                 :  * that are not currently summarized.
     992                 :  */
     993                 : Datum
     994              38 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     995                 : {
     996              38 :     Datum       relation = PG_GETARG_DATUM(0);
     997                 : 
     998              38 :     return DirectFunctionCall2(brin_summarize_range,
     999                 :                                relation,
    1000                 :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
    1001                 : }
    1002                 : 
    1003                 : /*
    1004                 :  * SQL-callable function to summarize the indicated page range, if not already
    1005                 :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
    1006                 :  * unsummarized ranges are summarized.
    1007                 :  */
    1008                 : Datum
    1009             101 : brin_summarize_range(PG_FUNCTION_ARGS)
    1010                 : {
    1011             101 :     Oid         indexoid = PG_GETARG_OID(0);
    1012             101 :     int64       heapBlk64 = PG_GETARG_INT64(1);
    1013                 :     BlockNumber heapBlk;
    1014                 :     Oid         heapoid;
    1015                 :     Relation    indexRel;
    1016                 :     Relation    heapRel;
    1017                 :     Oid         save_userid;
    1018                 :     int         save_sec_context;
    1019                 :     int         save_nestlevel;
    1020             101 :     double      numSummarized = 0;
    1021                 : 
    1022             101 :     if (RecoveryInProgress())
    1023 UBC           0 :         ereport(ERROR,
    1024                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1025                 :                  errmsg("recovery is in progress"),
    1026                 :                  errhint("BRIN control functions cannot be executed during recovery.")));
    1027                 : 
    1028 CBC         101 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
    1029              18 :         ereport(ERROR,
    1030                 :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
    1031                 :                  errmsg("block number out of range: %lld",
    1032                 :                         (long long) heapBlk64)));
    1033              83 :     heapBlk = (BlockNumber) heapBlk64;
    1034                 : 
    1035                 :     /*
    1036                 :      * We must lock table before index to avoid deadlocks.  However, if the
    1037                 :      * passed indexoid isn't an index then IndexGetRelation() will fail.
    1038                 :      * Rather than emitting a not-very-helpful error message, postpone
    1039                 :      * complaining, expecting that the is-it-an-index test below will fail.
    1040                 :      */
    1041              83 :     heapoid = IndexGetRelation(indexoid, true);
    1042              83 :     if (OidIsValid(heapoid))
    1043                 :     {
    1044              74 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
    1045                 : 
    1046                 :         /*
    1047                 :          * Autovacuum calls us.  For its benefit, switch to the table owner's
    1048                 :          * userid, so that any index functions are run as that user.  Also
    1049                 :          * lock down security-restricted operations and arrange to make GUC
    1050                 :          * variable changes local to this command.  This is harmless, albeit
    1051                 :          * unnecessary, when called from SQL, because we fail shortly if the
    1052                 :          * user does not own the index.
    1053                 :          */
    1054              74 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
    1055              74 :         SetUserIdAndSecContext(heapRel->rd_rel->relowner,
    1056                 :                                save_sec_context | SECURITY_RESTRICTED_OPERATION);
    1057              74 :         save_nestlevel = NewGUCNestLevel();
    1058                 :     }
    1059                 :     else
    1060                 :     {
    1061               9 :         heapRel = NULL;
    1062                 :         /* Set these just to suppress "uninitialized variable" warnings */
    1063               9 :         save_userid = InvalidOid;
    1064               9 :         save_sec_context = -1;
    1065               9 :         save_nestlevel = -1;
    1066                 :     }
    1067                 : 
    1068              83 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
    1069                 : 
    1070                 :     /* Must be a BRIN index */
    1071              74 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1072              74 :         indexRel->rd_rel->relam != BRIN_AM_OID)
    1073               9 :         ereport(ERROR,
    1074                 :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1075                 :                  errmsg("\"%s\" is not a BRIN index",
    1076                 :                         RelationGetRelationName(indexRel))));
    1077                 : 
    1078                 :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1079 GNC          65 :     if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
    1080 UBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1081               0 :                        RelationGetRelationName(indexRel));
    1082                 : 
    1083                 :     /*
    1084                 :      * Since we did the IndexGetRelation call above without any lock, it's
    1085                 :      * barely possible that a race against an index drop/recreation could have
    1086                 :      * netted us the wrong table.  Recheck.
    1087                 :      */
    1088 CBC          65 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1089 UBC           0 :         ereport(ERROR,
    1090                 :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1091                 :                  errmsg("could not open parent table of index \"%s\"",
    1092                 :                         RelationGetRelationName(indexRel))));
    1093                 : 
    1094                 :     /* OK, do it */
    1095 CBC          65 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
    1096                 : 
    1097                 :     /* Roll back any GUC changes executed by index functions */
    1098              65 :     AtEOXact_GUC(false, save_nestlevel);
    1099                 : 
    1100                 :     /* Restore userid and security context */
    1101              65 :     SetUserIdAndSecContext(save_userid, save_sec_context);
    1102                 : 
    1103              65 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1104              65 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1105                 : 
    1106              65 :     PG_RETURN_INT32((int32) numSummarized);
    1107                 : }
    1108                 : 
    1109                 : /*
    1110                 :  * SQL-callable interface to mark a range as no longer summarized
    1111                 :  */
    1112                 : Datum
    1113              51 : brin_desummarize_range(PG_FUNCTION_ARGS)
    1114                 : {
    1115              51 :     Oid         indexoid = PG_GETARG_OID(0);
    1116              51 :     int64       heapBlk64 = PG_GETARG_INT64(1);
    1117                 :     BlockNumber heapBlk;
    1118                 :     Oid         heapoid;
    1119                 :     Relation    heapRel;
    1120                 :     Relation    indexRel;
    1121                 :     bool        done;
    1122                 : 
    1123              51 :     if (RecoveryInProgress())
    1124 UBC           0 :         ereport(ERROR,
    1125                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1126                 :                  errmsg("recovery is in progress"),
    1127                 :                  errhint("BRIN control functions cannot be executed during recovery.")));
    1128                 : 
    1129 CBC          51 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
    1130               9 :         ereport(ERROR,
    1131                 :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
    1132                 :                  errmsg("block number out of range: %lld",
    1133                 :                         (long long) heapBlk64)));
    1134              42 :     heapBlk = (BlockNumber) heapBlk64;
    1135                 : 
    1136                 :     /*
    1137                 :      * We must lock table before index to avoid deadlocks.  However, if the
    1138                 :      * passed indexoid isn't an index then IndexGetRelation() will fail.
    1139                 :      * Rather than emitting a not-very-helpful error message, postpone
    1140                 :      * complaining, expecting that the is-it-an-index test below will fail.
    1141                 :      *
    1142                 :      * Unlike brin_summarize_range(), autovacuum never calls this.  Hence, we
    1143                 :      * don't switch userid.
    1144                 :      */
    1145              42 :     heapoid = IndexGetRelation(indexoid, true);
    1146              42 :     if (OidIsValid(heapoid))
    1147              42 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
    1148                 :     else
    1149 UBC           0 :         heapRel = NULL;
    1150                 : 
    1151 CBC          42 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
    1152                 : 
    1153                 :     /* Must be a BRIN index */
    1154              42 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1155              42 :         indexRel->rd_rel->relam != BRIN_AM_OID)
    1156 UBC           0 :         ereport(ERROR,
    1157                 :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1158                 :                  errmsg("\"%s\" is not a BRIN index",
    1159                 :                         RelationGetRelationName(indexRel))));
    1160                 : 
    1161                 :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1162 GNC          42 :     if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
    1163 UBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1164               0 :                        RelationGetRelationName(indexRel));
    1165                 : 
    1166                 :     /*
    1167                 :      * Since we did the IndexGetRelation call above without any lock, it's
    1168                 :      * barely possible that a race against an index drop/recreation could have
    1169                 :      * netted us the wrong table.  Recheck.
    1170                 :      */
    1171 CBC          42 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1172 UBC           0 :         ereport(ERROR,
    1173                 :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1174                 :                  errmsg("could not open parent table of index \"%s\"",
    1175                 :                         RelationGetRelationName(indexRel))));
    1176                 : 
    1177                 :     /* the revmap does the hard work */
    1178                 :     do
    1179                 :     {
    1180 CBC          42 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
    1181                 :     }
    1182              42 :     while (!done);
    1183                 : 
    1184              42 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1185              42 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1186                 : 
    1187              42 :     PG_RETURN_VOID();
    1188                 : }
    1189                 : 
    1190                 : /*
    1191                 :  * Build a BrinDesc used to create or scan a BRIN index
    1192                 :  */
    1193                 : BrinDesc *
    1194            1968 : brin_build_desc(Relation rel)
    1195                 : {
    1196                 :     BrinOpcInfo **opcinfo;
    1197                 :     BrinDesc   *bdesc;
    1198                 :     TupleDesc   tupdesc;
    1199            1968 :     int         totalstored = 0;
    1200                 :     int         keyno;
    1201                 :     long        totalsize;
    1202                 :     MemoryContext cxt;
    1203                 :     MemoryContext oldcxt;
    1204                 : 
    1205            1968 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1206                 :                                 "brin desc cxt",
    1207                 :                                 ALLOCSET_SMALL_SIZES);
    1208            1968 :     oldcxt = MemoryContextSwitchTo(cxt);
    1209            1968 :     tupdesc = RelationGetDescr(rel);
    1210                 : 
    1211                 :     /*
    1212                 :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1213                 :      * the number of columns stored, since the number is opclass-defined.
    1214                 :      */
    1215 GNC        1968 :     opcinfo = palloc_array(BrinOpcInfo*, tupdesc->natts);
    1216 CBC       37096 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1217                 :     {
    1218                 :         FmgrInfo   *opcInfoFn;
    1219           35128 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1220                 : 
    1221           35128 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1222                 : 
    1223           70256 :         opcinfo[keyno] = (BrinOpcInfo *)
    1224           35128 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1225           35128 :         totalstored += opcinfo[keyno]->oi_nstored;
    1226                 :     }
    1227                 : 
    1228                 :     /* Allocate our result struct and fill it in */
    1229            1968 :     totalsize = offsetof(BrinDesc, bd_info) +
    1230            1968 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1231                 : 
    1232            1968 :     bdesc = palloc(totalsize);
    1233            1968 :     bdesc->bd_context = cxt;
    1234            1968 :     bdesc->bd_index = rel;
    1235            1968 :     bdesc->bd_tupdesc = tupdesc;
    1236            1968 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1237            1968 :     bdesc->bd_totalstored = totalstored;
    1238                 : 
    1239           37096 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1240           35128 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1241            1968 :     pfree(opcinfo);
    1242                 : 
    1243            1968 :     MemoryContextSwitchTo(oldcxt);
    1244                 : 
    1245            1968 :     return bdesc;
    1246                 : }
    1247                 : 
    1248                 : void
    1249            1458 : brin_free_desc(BrinDesc *bdesc)
    1250                 : {
    1251                 :     /* make sure the tupdesc is still valid */
    1252            1458 :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1253                 :     /* no need for retail pfree */
    1254            1458 :     MemoryContextDelete(bdesc->bd_context);
    1255            1458 : }
    1256                 : 
    1257                 : /*
    1258                 :  * Fetch index's statistical data into *stats
    1259                 :  */
    1260                 : void
    1261            5178 : brinGetStats(Relation index, BrinStatsData *stats)
    1262                 : {
    1263                 :     Buffer      metabuffer;
    1264                 :     Page        metapage;
    1265                 :     BrinMetaPageData *metadata;
    1266                 : 
    1267            5178 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1268            5178 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1269            5178 :     metapage = BufferGetPage(metabuffer);
    1270            5178 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1271                 : 
    1272            5178 :     stats->pagesPerRange = metadata->pagesPerRange;
    1273            5178 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1274                 : 
    1275            5178 :     UnlockReleaseBuffer(metabuffer);
    1276            5178 : }
    1277                 : 
    1278                 : /*
    1279                 :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1280                 :  */
    1281                 : static BrinBuildState *
    1282             158 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1283                 :                            BlockNumber pagesPerRange)
    1284                 : {
    1285                 :     BrinBuildState *state;
    1286                 : 
    1287 GNC         158 :     state = palloc_object(BrinBuildState);
    1288                 : 
    1289 CBC         158 :     state->bs_irel = idxRel;
    1290             158 :     state->bs_numtuples = 0;
    1291             158 :     state->bs_currentInsertBuf = InvalidBuffer;
    1292             158 :     state->bs_pagesPerRange = pagesPerRange;
    1293             158 :     state->bs_currRangeStart = 0;
    1294             158 :     state->bs_rmAccess = revmap;
    1295             158 :     state->bs_bdesc = brin_build_desc(idxRel);
    1296             158 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1297                 : 
    1298             158 :     return state;
    1299                 : }
    1300                 : 
    1301                 : /*
    1302                 :  * Release resources associated with a BrinBuildState.
    1303                 :  */
    1304                 : static void
    1305             158 : terminate_brin_buildstate(BrinBuildState *state)
    1306                 : {
    1307                 :     /*
    1308                 :      * Release the last index buffer used.  We might as well ensure that
    1309                 :      * whatever free space remains in that page is available in FSM, too.
    1310                 :      */
    1311             158 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1312                 :     {
    1313                 :         Page        page;
    1314                 :         Size        freespace;
    1315                 :         BlockNumber blk;
    1316                 : 
    1317             119 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1318             119 :         freespace = PageGetFreeSpace(page);
    1319             119 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
    1320             119 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1321             119 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
    1322             119 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
    1323                 :     }
    1324                 : 
    1325             158 :     brin_free_desc(state->bs_bdesc);
    1326             158 :     pfree(state->bs_dtuple);
    1327             158 :     pfree(state);
    1328             158 : }
    1329                 : 
    1330                 : /*
    1331                 :  * On the given BRIN index, summarize the heap page range that corresponds
    1332                 :  * to the heap block number given.
    1333                 :  *
    1334                 :  * This routine can run in parallel with insertions into the heap.  To avoid
    1335                 :  * missing those values from the summary tuple, we first insert a placeholder
    1336                 :  * index tuple into the index, then execute the heap scan; transactions
    1337                 :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1338                 :  * union the placeholder tuple with the one computed by this routine.  The
    1339                 :  * update of the index value happens in a loop, so that if somebody updates
    1340                 :  * the placeholder tuple after we read it, we detect the case and try again.
    1341                 :  * This ensures that the concurrently inserted tuples are not lost.
    1342                 :  *
    1343                 :  * A further corner case is this routine being asked to summarize the partial
    1344                 :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
    1345                 :  * table size; if we notice that the requested range lies beyond that size,
    1346                 :  * we re-compute the table size after inserting the placeholder tuple, to
    1347                 :  * avoid missing pages that were appended recently.
    1348                 :  */
    1349                 : static void
    1350            1467 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1351                 :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1352                 : {
    1353                 :     Buffer      phbuf;
    1354                 :     BrinTuple  *phtup;
    1355                 :     Size        phsz;
    1356                 :     OffsetNumber offset;
    1357                 :     BlockNumber scanNumBlks;
    1358                 : 
    1359                 :     /*
    1360                 :      * Insert the placeholder tuple
    1361                 :      */
    1362            1467 :     phbuf = InvalidBuffer;
    1363            1467 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1364            1467 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1365                 :                            state->bs_rmAccess, &phbuf,
    1366                 :                            heapBlk, phtup, phsz);
    1367                 : 
    1368                 :     /*
    1369                 :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
    1370                 :      * cannot shrink concurrently (but it can grow).
    1371                 :      */
    1372            1467 :     Assert(heapBlk % state->bs_pagesPerRange == 0);
    1373            1467 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
    1374                 :     {
    1375                 :         /*
    1376                 :          * If we're asked to scan what we believe to be the final range on the
    1377                 :          * table (i.e. a range that might be partial) we need to recompute our
    1378                 :          * idea of what the latest page is after inserting the placeholder
    1379                 :          * tuple.  Anyone that grows the table later will update the
    1380                 :          * placeholder tuple, so it doesn't matter that we won't scan these
    1381                 :          * pages ourselves.  Careful: the table might have been extended
    1382                 :          * beyond the current range, so clamp our result.
    1383                 :          *
    1384                 :          * Fortunately, this should occur infrequently.
    1385                 :          */
    1386              12 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
    1387                 :                           state->bs_pagesPerRange);
    1388                 :     }
    1389                 :     else
    1390                 :     {
    1391                 :         /* Easy case: range is known to be complete */
    1392            1455 :         scanNumBlks = state->bs_pagesPerRange;
    1393                 :     }
    1394                 : 
    1395                 :     /*
    1396                 :      * Execute the partial heap scan covering the heap blocks in the specified
    1397                 :      * page range, summarizing the heap tuples in it.  This scan stops just
    1398                 :      * short of brinbuildCallback creating the new index entry.
    1399                 :      *
    1400                 :      * Note that it is critical we use the "any visible" mode of
    1401                 :      * table_index_build_range_scan here: otherwise, we would miss tuples
    1402                 :      * inserted by transactions that are still in progress, among other corner
    1403                 :      * cases.
    1404                 :      */
    1405            1467 :     state->bs_currRangeStart = heapBlk;
    1406            1467 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
    1407                 :                                  heapBlk, scanNumBlks,
    1408                 :                                  brinbuildCallback, (void *) state, NULL);
    1409                 : 
    1410                 :     /*
    1411                 :      * Now we update the values obtained by the scan with the placeholder
    1412                 :      * tuple.  We do this in a loop which only terminates if we're able to
    1413                 :      * update the placeholder tuple successfully; if we are not, this means
    1414                 :      * somebody else modified the placeholder tuple after we read it.
    1415                 :      */
    1416                 :     for (;;)
    1417 UBC           0 :     {
    1418                 :         BrinTuple  *newtup;
    1419                 :         Size        newsize;
    1420                 :         bool        didupdate;
    1421                 :         bool        samepage;
    1422                 : 
    1423 CBC        1467 :         CHECK_FOR_INTERRUPTS();
    1424                 : 
    1425                 :         /*
    1426                 :          * Update the summary tuple and try to update.
    1427                 :          */
    1428            1467 :         newtup = brin_form_tuple(state->bs_bdesc,
    1429                 :                                  heapBlk, state->bs_dtuple, &newsize);
    1430            1467 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1431                 :         didupdate =
    1432            1467 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1433                 :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1434                 :                           phtup, phsz, newtup, newsize, samepage);
    1435            1467 :         brin_free_tuple(phtup);
    1436            1467 :         brin_free_tuple(newtup);
    1437                 : 
    1438                 :         /* If the update succeeded, we're done. */
    1439            1467 :         if (didupdate)
    1440            1467 :             break;
    1441                 : 
    1442                 :         /*
    1443                 :          * If the update didn't work, it might be because somebody updated the
    1444                 :          * placeholder tuple concurrently.  Extract the new version, union it
    1445                 :          * with the values we have from the scan, and start over.  (There are
    1446                 :          * other reasons for the update to fail, but it's simple to treat them
    1447                 :          * the same.)
    1448                 :          */
    1449 UBC           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1450                 :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1451                 :                                          NULL);
    1452                 :         /* the placeholder tuple must exist */
    1453               0 :         if (phtup == NULL)
    1454               0 :             elog(ERROR, "missing placeholder tuple");
    1455               0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1456               0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1457                 : 
    1458                 :         /* merge it into the tuple from the heap scan */
    1459               0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1460                 :     }
    1461                 : 
    1462 CBC        1467 :     ReleaseBuffer(phbuf);
    1463            1467 : }
    1464                 : 
    1465                 : /*
    1466                 :  * Summarize page ranges that are not already summarized.  If pageRange is
    1467                 :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1468                 :  * page range containing the given heap page number is scanned.
    1469                 :  * If include_partial is true, then the partial range at the end of the table
    1470                 :  * is summarized, otherwise not.
    1471                 :  *
    1472                 :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1473                 :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1474                 :  * incremented.
    1475                 :  */
    1476                 : static void
    1477             107 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1478                 :               bool include_partial, double *numSummarized, double *numExisting)
    1479                 : {
    1480                 :     BrinRevmap *revmap;
    1481             107 :     BrinBuildState *state = NULL;
    1482             107 :     IndexInfo  *indexInfo = NULL;
    1483                 :     BlockNumber heapNumBlocks;
    1484                 :     BlockNumber pagesPerRange;
    1485                 :     Buffer      buf;
    1486                 :     BlockNumber startBlk;
    1487                 : 
    1488             107 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1489                 : 
    1490                 :     /* determine range of pages to process */
    1491             107 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1492             107 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1493              71 :         startBlk = 0;
    1494                 :     else
    1495                 :     {
    1496              36 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1497              36 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
    1498                 :     }
    1499             107 :     if (startBlk > heapNumBlocks)
    1500                 :     {
    1501                 :         /* Nothing to do if start point is beyond end of table */
    1502 UBC           0 :         brinRevmapTerminate(revmap);
    1503               0 :         return;
    1504                 :     }
    1505                 : 
    1506                 :     /*
    1507                 :      * Scan the revmap to find unsummarized items.
    1508                 :      */
    1509 CBC         107 :     buf = InvalidBuffer;
    1510            9469 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
    1511                 :     {
    1512                 :         BrinTuple  *tup;
    1513                 :         OffsetNumber off;
    1514                 : 
    1515                 :         /*
    1516                 :          * Unless requested to summarize even a partial range, go away now if
    1517                 :          * we think the next range is partial.  Caller would pass true when it
    1518                 :          * is typically run once bulk data loading is done
    1519                 :          * (brin_summarize_new_values), and false when it is typically the
    1520                 :          * result of arbitrarily-scheduled maintenance command (vacuuming).
    1521                 :          */
    1522            9394 :         if (!include_partial &&
    1523            1024 :             (startBlk + pagesPerRange > heapNumBlocks))
    1524              32 :             break;
    1525                 : 
    1526            9362 :         CHECK_FOR_INTERRUPTS();
    1527                 : 
    1528            9362 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
    1529                 :                                        BUFFER_LOCK_SHARE, NULL);
    1530            9362 :         if (tup == NULL)
    1531                 :         {
    1532                 :             /* no revmap entry for this heap range. Summarize it. */
    1533            1467 :             if (state == NULL)
    1534                 :             {
    1535                 :                 /* first time through */
    1536              39 :                 Assert(!indexInfo);
    1537              39 :                 state = initialize_brin_buildstate(index, revmap,
    1538                 :                                                    pagesPerRange);
    1539              39 :                 indexInfo = BuildIndexInfo(index);
    1540                 :             }
    1541            1467 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
    1542                 : 
    1543                 :             /* and re-initialize state for the next range */
    1544            1467 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1545                 : 
    1546            1467 :             if (numSummarized)
    1547            1467 :                 *numSummarized += 1.0;
    1548                 :         }
    1549                 :         else
    1550                 :         {
    1551            7895 :             if (numExisting)
    1552             946 :                 *numExisting += 1.0;
    1553            7895 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1554                 :         }
    1555                 :     }
    1556                 : 
    1557             107 :     if (BufferIsValid(buf))
    1558              75 :         ReleaseBuffer(buf);
    1559                 : 
    1560                 :     /* free resources */
    1561             107 :     brinRevmapTerminate(revmap);
    1562             107 :     if (state)
    1563                 :     {
    1564              39 :         terminate_brin_buildstate(state);
    1565              39 :         pfree(indexInfo);
    1566                 :     }
    1567                 : }
    1568                 : 
    1569                 : /*
    1570                 :  * Given a deformed tuple in the build state, convert it into the on-disk
    1571                 :  * format and insert it into the index, making the revmap point to it.
    1572                 :  */
    1573                 : static void
    1574            1124 : form_and_insert_tuple(BrinBuildState *state)
    1575                 : {
    1576                 :     BrinTuple  *tup;
    1577                 :     Size        size;
    1578                 : 
    1579            1124 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1580                 :                           state->bs_dtuple, &size);
    1581            1124 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1582                 :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1583                 :                   tup, size);
    1584            1124 :     state->bs_numtuples++;
    1585                 : 
    1586            1124 :     pfree(tup);
    1587            1124 : }
    1588                 : 
    1589                 : /*
    1590                 :  * Given two deformed tuples, adjust the first one so that it's consistent
    1591                 :  * with the summary values in both.
    1592                 :  */
    1593                 : static void
    1594 UBC           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1595                 : {
    1596                 :     int         keyno;
    1597                 :     BrinMemTuple *db;
    1598                 :     MemoryContext cxt;
    1599                 :     MemoryContext oldcxt;
    1600                 : 
    1601                 :     /* Use our own memory context to avoid retail pfree */
    1602               0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1603                 :                                 "brin union",
    1604                 :                                 ALLOCSET_DEFAULT_SIZES);
    1605               0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1606               0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1607               0 :     MemoryContextSwitchTo(oldcxt);
    1608                 : 
    1609               0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1610                 :     {
    1611                 :         FmgrInfo   *unionFn;
    1612               0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1613               0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1614               0 :         BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
    1615                 : 
    1616               0 :         if (opcinfo->oi_regular_nulls)
    1617                 :         {
    1618                 :             /* Adjust "hasnulls". */
    1619               0 :             if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
    1620               0 :                 col_a->bv_hasnulls = true;
    1621                 : 
    1622                 :             /* If there are no values in B, there's nothing left to do. */
    1623               0 :             if (col_b->bv_allnulls)
    1624               0 :                 continue;
    1625                 : 
    1626                 :             /*
    1627                 :              * Adjust "allnulls".  If A doesn't have values, just copy the
    1628                 :              * values from B into A, and we're done.  We cannot run the
    1629                 :              * operators in this case, because values in A might contain
    1630                 :              * garbage.  Note we already established that B contains values.
    1631                 :              */
    1632               0 :             if (col_a->bv_allnulls)
    1633               0 :             {
    1634                 :                 int         i;
    1635                 : 
    1636               0 :                 col_a->bv_allnulls = false;
    1637                 : 
    1638               0 :                 for (i = 0; i < opcinfo->oi_nstored; i++)
    1639               0 :                     col_a->bv_values[i] =
    1640               0 :                         datumCopy(col_b->bv_values[i],
    1641               0 :                                   opcinfo->oi_typcache[i]->typbyval,
    1642               0 :                                   opcinfo->oi_typcache[i]->typlen);
    1643                 : 
    1644               0 :                 continue;
    1645                 :             }
    1646                 :         }
    1647                 : 
    1648               0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1649                 :                                     BRIN_PROCNUM_UNION);
    1650               0 :         FunctionCall3Coll(unionFn,
    1651               0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1652                 :                           PointerGetDatum(bdesc),
    1653                 :                           PointerGetDatum(col_a),
    1654                 :                           PointerGetDatum(col_b));
    1655                 :     }
    1656                 : 
    1657               0 :     MemoryContextDelete(cxt);
    1658               0 : }
    1659                 : 
    1660                 : /*
    1661                 :  * brin_vacuum_scan
    1662                 :  *      Do a complete scan of the index during VACUUM.
    1663                 :  *
    1664                 :  * This routine scans the complete index looking for uncatalogued index pages,
    1665                 :  * i.e. those that might have been lost due to a crash after index extension
    1666                 :  * and such.
    1667                 :  */
    1668                 : static void
    1669 CBC          42 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1670                 : {
    1671                 :     BlockNumber nblocks;
    1672                 :     BlockNumber blkno;
    1673                 : 
    1674                 :     /*
    1675                 :      * Scan the index in physical order, and clean up any possible mess in
    1676                 :      * each page.
    1677                 :      */
    1678              42 :     nblocks = RelationGetNumberOfBlocks(idxrel);
    1679             225 :     for (blkno = 0; blkno < nblocks; blkno++)
    1680                 :     {
    1681                 :         Buffer      buf;
    1682                 : 
    1683             183 :         CHECK_FOR_INTERRUPTS();
    1684                 : 
    1685             183 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1686                 :                                  RBM_NORMAL, strategy);
    1687                 : 
    1688             183 :         brin_page_cleanup(idxrel, buf);
    1689                 : 
    1690             183 :         ReleaseBuffer(buf);
    1691                 :     }
    1692                 : 
    1693                 :     /*
    1694                 :      * Update all upper pages in the index's FSM, as well.  This ensures not
    1695                 :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
    1696                 :      * but also that any pre-existing damage or out-of-dateness is repaired.
    1697                 :      */
    1698              42 :     FreeSpaceMapVacuum(idxrel);
    1699              42 : }
    1700                 : 
    1701                 : static bool
    1702          354407 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
    1703                 :                     Datum *values, bool *nulls)
    1704                 : {
    1705                 :     int         keyno;
    1706          354407 :     bool        modified = false;
    1707                 : 
    1708                 :     /*
    1709                 :      * Compare the key values of the new tuple to the stored index values; our
    1710                 :      * deformed tuple will get updated if the new tuple doesn't fit the
    1711                 :      * original range (note this means we can't break out of the loop early).
    1712                 :      * Make a note of whether this happens, so that we know to insert the
    1713                 :      * modified tuple later.
    1714                 :      */
    1715          782761 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1716                 :     {
    1717                 :         Datum       result;
    1718                 :         BrinValues *bval;
    1719                 :         FmgrInfo   *addValue;
    1720                 : 
    1721          428354 :         bval = &dtup->bt_columns[keyno];
    1722                 : 
    1723          428354 :         if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
    1724                 :         {
    1725                 :             /*
    1726                 :              * If the new value is null, we record that we saw it if it's the
    1727                 :              * first one; otherwise, there's nothing to do.
    1728                 :              */
    1729            6972 :             if (!bval->bv_hasnulls)
    1730                 :             {
    1731            1680 :                 bval->bv_hasnulls = true;
    1732            1680 :                 modified = true;
    1733                 :             }
    1734                 : 
    1735            6972 :             continue;
    1736                 :         }
    1737                 : 
    1738          421382 :         addValue = index_getprocinfo(idxRel, keyno + 1,
    1739                 :                                      BRIN_PROCNUM_ADDVALUE);
    1740          421382 :         result = FunctionCall4Coll(addValue,
    1741          421382 :                                    idxRel->rd_indcollation[keyno],
    1742                 :                                    PointerGetDatum(bdesc),
    1743                 :                                    PointerGetDatum(bval),
    1744          421382 :                                    values[keyno],
    1745          421382 :                                    nulls[keyno]);
    1746                 :         /* if that returned true, we need to insert the updated tuple */
    1747          421382 :         modified |= DatumGetBool(result);
    1748                 :     }
    1749                 : 
    1750          354407 :     return modified;
    1751                 : }
    1752                 : 
    1753                 : static bool
    1754           93927 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
    1755                 : {
    1756                 :     int         keyno;
    1757                 : 
    1758                 :     /*
    1759                 :      * First check if there are any IS [NOT] NULL scan keys, and if we're
    1760                 :      * violating them.
    1761                 :      */
    1762           94545 :     for (keyno = 0; keyno < nnullkeys; keyno++)
    1763                 :     {
    1764            1116 :         ScanKey     key = nullkeys[keyno];
    1765                 : 
    1766            1116 :         Assert(key->sk_attno == bval->bv_attno);
    1767                 : 
    1768                 :         /* Handle only IS NULL/IS NOT NULL tests */
    1769            1116 :         if (!(key->sk_flags & SK_ISNULL))
    1770 UBC           0 :             continue;
    1771                 : 
    1772 CBC        1116 :         if (key->sk_flags & SK_SEARCHNULL)
    1773                 :         {
    1774                 :             /* IS NULL scan key, but range has no NULLs */
    1775             558 :             if (!bval->bv_allnulls && !bval->bv_hasnulls)
    1776             489 :                 return false;
    1777                 :         }
    1778             558 :         else if (key->sk_flags & SK_SEARCHNOTNULL)
    1779                 :         {
    1780                 :             /*
    1781                 :              * For IS NOT NULL, we can only skip ranges that are known to have
    1782                 :              * only nulls.
    1783                 :              */
    1784             558 :             if (bval->bv_allnulls)
    1785               9 :                 return false;
    1786                 :         }
    1787                 :         else
    1788                 :         {
    1789                 :             /*
    1790                 :              * Neither IS NULL nor IS NOT NULL was used; assume all indexable
    1791                 :              * operators are strict and thus return false with NULL value in
    1792                 :              * the scan key.
    1793                 :              */
    1794 UBC           0 :             return false;
    1795                 :         }
    1796                 :     }
    1797                 : 
    1798 CBC       93429 :     return true;
    1799                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a