LCOV - differential code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Coverage Total Hit LBC UIC UBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 89.9 % 557 501 1 6 49 133 13 355 7 137 8
Current Date: 2023-04-08 17:13:01 Functions: 96.3 % 27 26 1 9 6 11 9
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 5 5 1 3 1
Legend: Lines: hit not hit (120,180] days: 100.0 % 2 2 2
(180,240] days: 100.0 % 8 8 8
(240..) days: 89.7 % 542 486 1 6 49 132 354 7 137
Function coverage date bins:
(240..) days: 72.2 % 36 26 1 9 6 11 9

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*
                                  2                 :  * brin.c
                                  3                 :  *      Implementation of BRIN indexes for Postgres
                                  4                 :  *
                                  5                 :  * See src/backend/access/brin/README for details.
                                  6                 :  *
                                  7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  8                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *    src/backend/access/brin/brin.c
                                 12                 :  *
                                 13                 :  * TODO
                                 14                 :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
                                 15                 :  */
                                 16                 : #include "postgres.h"
                                 17                 : 
                                 18                 : #include "access/brin.h"
                                 19                 : #include "access/brin_page.h"
                                 20                 : #include "access/brin_pageops.h"
                                 21                 : #include "access/brin_xlog.h"
                                 22                 : #include "access/relation.h"
                                 23                 : #include "access/reloptions.h"
                                 24                 : #include "access/relscan.h"
                                 25                 : #include "access/table.h"
                                 26                 : #include "access/tableam.h"
                                 27                 : #include "access/xloginsert.h"
                                 28                 : #include "catalog/index.h"
                                 29                 : #include "catalog/pg_am.h"
                                 30                 : #include "commands/vacuum.h"
                                 31                 : #include "miscadmin.h"
                                 32                 : #include "pgstat.h"
                                 33                 : #include "postmaster/autovacuum.h"
                                 34                 : #include "storage/bufmgr.h"
                                 35                 : #include "storage/freespace.h"
                                 36                 : #include "utils/acl.h"
                                 37                 : #include "utils/builtins.h"
                                 38                 : #include "utils/datum.h"
                                 39                 : #include "utils/guc.h"
                                 40                 : #include "utils/index_selfuncs.h"
                                 41                 : #include "utils/memutils.h"
                                 42                 : #include "utils/rel.h"
                                 43                 : 
                                 44                 : 
                                 45                 : /*
                                 46                 :  * We use a BrinBuildState during initial construction of a BRIN index.
                                 47                 :  * The running state is kept in a BrinMemTuple.
                                 48                 :  */
                                 49                 : typedef struct BrinBuildState
                                 50                 : {
                                 51                 :     Relation    bs_irel;
                                 52                 :     int         bs_numtuples;
                                 53                 :     Buffer      bs_currentInsertBuf;
                                 54                 :     BlockNumber bs_pagesPerRange;
                                 55                 :     BlockNumber bs_currRangeStart;
                                 56                 :     BrinRevmap *bs_rmAccess;
                                 57                 :     BrinDesc   *bs_bdesc;
                                 58                 :     BrinMemTuple *bs_dtuple;
                                 59                 : } BrinBuildState;
                                 60                 : 
                                 61                 : /*
                                 62                 :  * Struct used as "opaque" during index scans
                                 63                 :  */
                                 64                 : typedef struct BrinOpaque
                                 65                 : {
                                 66                 :     BlockNumber bo_pagesPerRange;
                                 67                 :     BrinRevmap *bo_rmAccess;
                                 68                 :     BrinDesc   *bo_bdesc;
                                 69                 : } BrinOpaque;
                                 70                 : 
                                 71                 : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
                                 72                 : 
                                 73                 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
                                 74                 :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
                                 75                 : static void terminate_brin_buildstate(BrinBuildState *state);
                                 76                 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
                                 77                 :                           bool include_partial, double *numSummarized, double *numExisting);
                                 78                 : static void form_and_insert_tuple(BrinBuildState *state);
                                 79                 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
                                 80                 :                          BrinTuple *b);
                                 81                 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
                                 82                 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
                                 83                 :                                 BrinMemTuple *dtup, Datum *values, bool *nulls);
                                 84                 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
                                 85                 : 
                                 86                 : /*
                                 87                 :  * BRIN handler function: return IndexAmRoutine with access method parameters
                                 88                 :  * and callbacks.
                                 89                 :  */
                                 90                 : Datum
 2639 tgl                        91 GIC         901 : brinhandler(PG_FUNCTION_ARGS)
 2639 tgl                        92 ECB             : {
 2639 tgl                        93 GIC         901 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
 2639 tgl                        94 ECB             : 
 2639 tgl                        95 GIC         901 :     amroutine->amstrategies = 0;
 2639 tgl                        96 CBC         901 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
 1105 akorotkov                  97             901 :     amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
 2639 tgl                        98             901 :     amroutine->amcanorder = false;
                                 99             901 :     amroutine->amcanorderbyop = false;
                                100             901 :     amroutine->amcanbackward = false;
                                101             901 :     amroutine->amcanunique = false;
                                102             901 :     amroutine->amcanmulticol = true;
                                103             901 :     amroutine->amoptionalkey = true;
                                104             901 :     amroutine->amsearcharray = false;
                                105             901 :     amroutine->amsearchnulls = true;
                                106             901 :     amroutine->amstorage = true;
                                107             901 :     amroutine->amclusterable = false;
                                108             901 :     amroutine->ampredlocks = false;
 2244 rhaas                     109             901 :     amroutine->amcanparallel = false;
 1828 teodor                    110             901 :     amroutine->amcaninclude = false;
 1180 akapila                   111             901 :     amroutine->amusemaintenanceworkmem = false;
   20 tomas.vondra              112 GNC         901 :     amroutine->amsummarizing = true;
 1180 akapila                   113 CBC         901 :     amroutine->amparallelvacuumoptions =
 1180 akapila                   114 ECB             :         VACUUM_OPTION_PARALLEL_CLEANUP;
 2639 tgl                       115 CBC         901 :     amroutine->amkeytype = InvalidOid;
                                116                 : 
                                117             901 :     amroutine->ambuild = brinbuild;
 2639 tgl                       118 GIC         901 :     amroutine->ambuildempty = brinbuildempty;
 2639 tgl                       119 CBC         901 :     amroutine->aminsert = brininsert;
                                120             901 :     amroutine->ambulkdelete = brinbulkdelete;
                                121             901 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
                                122             901 :     amroutine->amcanreturn = NULL;
                                123             901 :     amroutine->amcostestimate = brincostestimate;
                                124             901 :     amroutine->amoptions = brinoptions;
 2430                           125             901 :     amroutine->amproperty = NULL;
 1468 alvherre                  126             901 :     amroutine->ambuildphasename = NULL;
 2639 tgl                       127             901 :     amroutine->amvalidate = brinvalidate;
  981                           128             901 :     amroutine->amadjustmembers = NULL;
 2639                           129             901 :     amroutine->ambeginscan = brinbeginscan;
                                130             901 :     amroutine->amrescan = brinrescan;
                                131             901 :     amroutine->amgettuple = NULL;
                                132             901 :     amroutine->amgetbitmap = bringetbitmap;
                                133             901 :     amroutine->amendscan = brinendscan;
                                134             901 :     amroutine->ammarkpos = NULL;
                                135             901 :     amroutine->amrestrpos = NULL;
 2266 rhaas                     136             901 :     amroutine->amestimateparallelscan = NULL;
                                137             901 :     amroutine->aminitparallelscan = NULL;
                                138             901 :     amroutine->amparallelrescan = NULL;
 2639 tgl                       139 ECB             : 
 2639 tgl                       140 CBC         901 :     PG_RETURN_POINTER(amroutine);
                                141                 : }
 2639 tgl                       142 ECB             : 
                                143                 : /*
                                144                 :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
                                145                 :  * we need to obtain the relevant index tuple and compare its stored values
                                146                 :  * with those of the new tuple.  If the tuple values are not consistent with
                                147                 :  * the summary tuple, we need to update the index tuple.
                                148                 :  *
                                149                 :  * If autosummarization is enabled, check if we need to summarize the previous
                                150                 :  * page range.
                                151                 :  *
                                152                 :  * If the range is not currently summarized (i.e. the revmap returns NULL for
                                153                 :  * it), there's nothing to do for this tuple.
                                154                 :  */
                                155                 : bool
 2639 tgl                       156 GIC       38960 : brininsert(Relation idxRel, Datum *values, bool *nulls,
                                157                 :            ItemPointer heaptid, Relation heapRel,
 2250 tgl                       158 ECB             :            IndexUniqueCheck checkUnique,
                                159                 :            bool indexUnchanged,
                                160                 :            IndexInfo *indexInfo)
                                161                 : {
                                162                 :     BlockNumber pagesPerRange;
                                163                 :     BlockNumber origHeapBlk;
                                164                 :     BlockNumber heapBlk;
 2250 tgl                       165 GIC       38960 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
                                166                 :     BrinRevmap *revmap;
 3075 alvherre                  167 CBC       38960 :     Buffer      buf = InvalidBuffer;
 3075 alvherre                  168 GIC       38960 :     MemoryContext tupcxt = NULL;
 2250 tgl                       169 CBC       38960 :     MemoryContext oldcxt = CurrentMemoryContext;
 2199 alvherre                  170           38960 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
 3075 alvherre                  171 ECB             : 
 2557 kgrittn                   172 CBC       38960 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
                                173                 : 
 2199 alvherre                  174 ECB             :     /*
                                175                 :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
                                176                 :      * is the first block in the corresponding page range.
                                177                 :      */
 2199 alvherre                  178 GIC       38960 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
                                179           38960 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
 2199 alvherre                  180 ECB             : 
 3075                           181                 :     for (;;)
 3075 alvherre                  182 UIC           0 :     {
 3075 alvherre                  183 GIC       38960 :         bool        need_insert = false;
 3075 alvherre                  184 EUB             :         OffsetNumber off;
 3075 alvherre                  185 ECB             :         BrinTuple  *brtup;
                                186                 :         BrinMemTuple *dtup;
                                187                 : 
 3075 alvherre                  188 GIC       38960 :         CHECK_FOR_INTERRUPTS();
                                189                 : 
 2199 alvherre                  190 ECB             :         /*
                                191                 :          * If auto-summarization is enabled and we just inserted the first
                                192                 :          * tuple into the first block of a new non-first page range, request a
                                193                 :          * summarization run of the previous range.
                                194                 :          */
 2199 alvherre                  195 GIC       38960 :         if (autosummarize &&
                                196              78 :             heapBlk > 0 &&
 2199 alvherre                  197 CBC          78 :             heapBlk == origHeapBlk &&
                                198              78 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
 2199 alvherre                  199 ECB             :         {
 2199 alvherre                  200 CBC           4 :             BlockNumber lastPageRange = heapBlk - 1;
                                201                 :             BrinTuple  *lastPageTuple;
 2199 alvherre                  202 ECB             : 
                                203                 :             lastPageTuple =
 2199 alvherre                  204 GIC           4 :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
                                205                 :                                          NULL, BUFFER_LOCK_SHARE, NULL);
 2199 alvherre                  206 CBC           4 :             if (!lastPageTuple)
                                207                 :             {
 1809 tgl                       208 ECB             :                 bool        recorded;
                                209                 : 
 1852 alvherre                  210 GIC           3 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
                                211                 :                                                  RelationGetRelid(idxRel),
 1852 alvherre                  212 ECB             :                                                  lastPageRange);
 1852 alvherre                  213 GIC           3 :                 if (!recorded)
 1852 alvherre                  214 UIC           0 :                     ereport(LOG,
 1852 alvherre                  215 ECB             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 1852 alvherre                  216 EUB             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
                                217                 :                                     RelationGetRelationName(idxRel),
                                218                 :                                     lastPageRange)));
                                219                 :             }
                                220                 :             else
 2140 alvherre                  221 GIC           1 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                                222                 :         }
 2199 alvherre                  223 ECB             : 
 2199 alvherre                  224 GIC       38960 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
                                225                 :                                          NULL, BUFFER_LOCK_SHARE, NULL);
 3075 alvherre                  226 ECB             : 
                                227                 :         /* if range is unsummarized, there's nothing to do */
 3075 alvherre                  228 GIC       38960 :         if (!brtup)
                                229           30870 :             break;
 3075 alvherre                  230 ECB             : 
 2250 tgl                       231                 :         /* First time through in this statement? */
 3075 alvherre                  232 GIC        8090 :         if (bdesc == NULL)
                                233                 :         {
 2250 tgl                       234 CBC         509 :             MemoryContextSwitchTo(indexInfo->ii_Context);
 3075 alvherre                  235 GIC         509 :             bdesc = brin_build_desc(idxRel);
 2250 tgl                       236 CBC         509 :             indexInfo->ii_AmCache = (void *) bdesc;
                                237             509 :             MemoryContextSwitchTo(oldcxt);
 2250 tgl                       238 ECB             :         }
                                239                 :         /* First time through in this brininsert call? */
 2250 tgl                       240 GIC        8090 :         if (tupcxt == NULL)
                                241                 :         {
 3075 alvherre                  242 CBC        8090 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
                                243                 :                                            "brininsert cxt",
 2416 tgl                       244 ECB             :                                            ALLOCSET_DEFAULT_SIZES);
 2250 tgl                       245 GIC        8090 :             MemoryContextSwitchTo(tupcxt);
                                246                 :         }
 3075 alvherre                  247 ECB             : 
 2193 alvherre                  248 GIC        8090 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
                                249                 : 
  747 tomas.vondra              250 CBC        8090 :         need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
                                251                 : 
 3075 alvherre                  252            8090 :         if (!need_insert)
                                253                 :         {
 3075 alvherre                  254 ECB             :             /*
                                255                 :              * The tuple is consistent with the new values, so there's nothing
                                256                 :              * to do.
                                257                 :              */
 3075 alvherre                  258 GIC        6412 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                                259                 :         }
 3075 alvherre                  260 ECB             :         else
                                261                 :         {
 2545 kgrittn                   262 GIC        1678 :             Page        page = BufferGetPage(buf);
 3075 alvherre                  263            1678 :             ItemId      lp = PageGetItemId(page, off);
 3075 alvherre                  264 ECB             :             Size        origsz;
                                265                 :             BrinTuple  *origtup;
                                266                 :             Size        newsz;
                                267                 :             BrinTuple  *newtup;
                                268                 :             bool        samepage;
                                269                 : 
                                270                 :             /*
                                271                 :              * Make a copy of the old tuple, so that we can compare it after
                                272                 :              * re-acquiring the lock.
                                273                 :              */
 3075 alvherre                  274 GIC        1678 :             origsz = ItemIdGetLength(lp);
 2193                           275            1678 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
 3075 alvherre                  276 ECB             : 
                                277                 :             /*
                                278                 :              * Before releasing the lock, check if we can attempt a same-page
                                279                 :              * update.  Another process could insert a tuple concurrently in
                                280                 :              * the same page though, so downstream we must be prepared to cope
                                281                 :              * if this turns out to not be possible after all.
                                282                 :              */
 3074 alvherre                  283 GIC        1678 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
 3075                           284            1678 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
 3075 alvherre                  285 CBC        1678 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 3075 alvherre                  286 ECB             : 
                                287                 :             /*
                                288                 :              * Try to update the tuple.  If this doesn't work for whatever
                                289                 :              * reason, we need to restart from the top; the revmap might be
                                290                 :              * pointing at a different tuple for this block now, so we need to
                                291                 :              * recompute to ensure both our new heap tuple and the other
                                292                 :              * inserter's are covered by the combined tuple.  It might be that
                                293                 :              * we don't need to update at all.
                                294                 :              */
 3075 alvherre                  295 GIC        1678 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
                                296                 :                                buf, off, origtup, origsz, newtup, newsz,
 3075 alvherre                  297 ECB             :                                samepage))
                                298                 :             {
                                299                 :                 /* no luck; start over */
 3075 alvherre                  300 UIC           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
                                301               0 :                 continue;
 3075 alvherre                  302 EUB             :             }
                                303                 :         }
                                304                 : 
                                305                 :         /* success! */
 3075 alvherre                  306 GIC        8090 :         break;
                                307                 :     }
 3075 alvherre                  308 ECB             : 
 3075 alvherre                  309 GIC       38960 :     brinRevmapTerminate(revmap);
                                310           38960 :     if (BufferIsValid(buf))
 3075 alvherre                  311 CBC        8091 :         ReleaseBuffer(buf);
 2250 tgl                       312           38960 :     MemoryContextSwitchTo(oldcxt);
                                313           38960 :     if (tupcxt != NULL)
 3075 alvherre                  314            8090 :         MemoryContextDelete(tupcxt);
 3075 alvherre                  315 ECB             : 
 2639 tgl                       316 CBC       38960 :     return false;
                                317                 : }
 3075 alvherre                  318 ECB             : 
                                319                 : /*
                                320                 :  * Initialize state for a BRIN index scan.
                                321                 :  *
                                322                 :  * We read the metapage here to determine the pages-per-range number that this
                                323                 :  * index was built with.  Note that since this cannot be changed while we're
                                324                 :  * holding lock on index, it's not necessary to recompute it during brinrescan.
                                325                 :  */
                                326                 : IndexScanDesc
 2639 tgl                       327 GIC        1290 : brinbeginscan(Relation r, int nkeys, int norderbys)
                                328                 : {
 3075 alvherre                  329 ECB             :     IndexScanDesc scan;
                                330                 :     BrinOpaque *opaque;
                                331                 : 
 3075 alvherre                  332 GIC        1290 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
                                333                 : 
  209 peter                     334 GNC        1290 :     opaque = palloc_object(BrinOpaque);
 2557 kgrittn                   335 GIC        1290 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
 2557 kgrittn                   336 ECB             :                                                scan->xs_snapshot);
 3075 alvherre                  337 CBC        1290 :     opaque->bo_bdesc = brin_build_desc(r);
 3075 alvherre                  338 GIC        1290 :     scan->opaque = opaque;
 3075 alvherre                  339 ECB             : 
 2639 tgl                       340 CBC        1290 :     return scan;
                                341                 : }
 3075 alvherre                  342 ECB             : 
                                343                 : /*
                                344                 :  * Execute the index scan.
                                345                 :  *
                                346                 :  * This works by reading index TIDs from the revmap, and obtaining the index
                                347                 :  * tuples pointed to by them; the summary values in the index tuples are
                                348                 :  * compared to the scan keys.  We return into the TID bitmap all the pages in
                                349                 :  * ranges corresponding to index tuples that match the scan keys.
                                350                 :  *
                                351                 :  * If a TID from the revmap is read as InvalidTID, we know that range is
                                352                 :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
                                353                 :  * keys.
                                354                 :  */
                                355                 : int64
 2639 tgl                       356 GIC        1290 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
                                357                 : {
 3075 alvherre                  358 CBC        1290 :     Relation    idxRel = scan->indexRelation;
 3075 alvherre                  359 GIC        1290 :     Buffer      buf = InvalidBuffer;
 3075 alvherre                  360 ECB             :     BrinDesc   *bdesc;
                                361                 :     Oid         heapOid;
                                362                 :     Relation    heapRel;
                                363                 :     BrinOpaque *opaque;
                                364                 :     BlockNumber nblocks;
                                365                 :     BlockNumber heapBlk;
 3075 alvherre                  366 GIC        1290 :     int         totalpages = 0;
                                367                 :     FmgrInfo   *consistentFn;
 3075 alvherre                  368 ECB             :     MemoryContext oldcxt;
                                369                 :     MemoryContext perRangeCxt;
                                370                 :     BrinMemTuple *dtup;
 2153 bruce                     371 GIC        1290 :     BrinTuple  *btup = NULL;
 2193 alvherre                  372            1290 :     Size        btupsz = 0;
  747 tomas.vondra              373 ECB             :     ScanKey   **keys,
                                374                 :               **nullkeys;
                                375                 :     int        *nkeys,
                                376                 :                *nnullkeys;
                                377                 :     char       *ptr;
                                378                 :     Size        len;
                                379                 :     char       *tmp PG_USED_FOR_ASSERTS_ONLY;
                                380                 : 
 3075 alvherre                  381 GIC        1290 :     opaque = (BrinOpaque *) scan->opaque;
 3075 alvherre                  382 CBC        1290 :     bdesc = opaque->bo_bdesc;
                                383            1290 :     pgstat_count_index_scan(idxRel);
 3075 alvherre                  384 ECB             : 
                                385                 :     /*
                                386                 :      * We need to know the size of the table so that we know how long to
                                387                 :      * iterate on the revmap.
                                388                 :      */
 3075 alvherre                  389 GIC        1290 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
 1539 andres                    390 CBC        1290 :     heapRel = table_open(heapOid, AccessShareLock);
 3075 alvherre                  391            1290 :     nblocks = RelationGetNumberOfBlocks(heapRel);
 1539 andres                    392            1290 :     table_close(heapRel, AccessShareLock);
 3075 alvherre                  393 ECB             : 
                                394                 :     /*
                                395                 :      * Make room for the consistent support procedures of indexed columns.  We
                                396                 :      * don't look them up here; we do that lazily the first time we see a scan
                                397                 :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
                                398                 :      */
  209 peter                     399 GNC        1290 :     consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
 3075 alvherre                  400 ECB             : 
                                401                 :     /*
                                402                 :      * Make room for per-attribute lists of scan keys that we'll pass to the
                                403                 :      * consistent support procedure. We don't know which attributes have scan
                                404                 :      * keys, so we allocate space for all attributes. That may use more memory
                                405                 :      * but it's probably cheaper than determining which attributes are used.
                                406                 :      *
                                407                 :      * We keep null and regular keys separate, so that we can pass just the
                                408                 :      * regular keys to the consistent function easily.
                                409                 :      *
                                410                 :      * To reduce the allocation overhead, we allocate one big chunk and then
                                411                 :      * carve it into smaller arrays ourselves. All the pieces have exactly the
                                412                 :      * same lifetime, so that's OK.
                                413                 :      *
                                414                 :      * XXX The widest index can have 32 attributes, so the amount of wasted
                                415                 :      * memory is negligible. We could invent a more compact approach (with
                                416                 :      * just space for used attributes) but that would make the matching more
                                417                 :      * complex so it's not a good trade-off.
                                418                 :      */
  747 tomas.vondra              419 GIC        1290 :     len =
  747 tomas.vondra              420 CBC        1290 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* regular keys */
                                421            1290 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
                                422            1290 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
                                423            1290 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* NULL keys */
                                424            1290 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
                                425            1290 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              426 ECB             : 
  747 tomas.vondra              427 GIC        1290 :     ptr = palloc(len);
  747 tomas.vondra              428 CBC        1290 :     tmp = ptr;
  747 tomas.vondra              429 ECB             : 
  747 tomas.vondra              430 GIC        1290 :     keys = (ScanKey **) ptr;
  747 tomas.vondra              431 CBC        1290 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              432 ECB             : 
  747 tomas.vondra              433 GIC        1290 :     nullkeys = (ScanKey **) ptr;
  747 tomas.vondra              434 CBC        1290 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              435 ECB             : 
  747 tomas.vondra              436 GIC        1290 :     nkeys = (int *) ptr;
  747 tomas.vondra              437 CBC        1290 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              438 ECB             : 
  747 tomas.vondra              439 GIC        1290 :     nnullkeys = (int *) ptr;
  747 tomas.vondra              440 CBC        1290 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              441 ECB             : 
  747 tomas.vondra              442 GIC       34623 :     for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
  747 tomas.vondra              443 ECB             :     {
  747 tomas.vondra              444 GIC       33333 :         keys[i] = (ScanKey *) ptr;
  747 tomas.vondra              445 CBC       33333 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
  747 tomas.vondra              446 ECB             : 
  747 tomas.vondra              447 GIC       33333 :         nullkeys[i] = (ScanKey *) ptr;
  747 tomas.vondra              448 CBC       33333 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
  747 tomas.vondra              449 ECB             :     }
                                450                 : 
  747 tomas.vondra              451 GIC        1290 :     Assert(tmp + len == ptr);
  747 tomas.vondra              452 ECB             : 
                                453                 :     /* zero the number of keys */
  747 tomas.vondra              454 GIC        1290 :     memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              455 CBC        1290 :     memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
  747 tomas.vondra              456 ECB             : 
                                457                 :     /* Preprocess the scan keys - split them into per-attribute arrays. */
  228 drowley                   458 GNC        2580 :     for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
  747 tomas.vondra              459 ECB             :     {
  747 tomas.vondra              460 GIC        1290 :         ScanKey     key = &scan->keyData[keyno];
  747 tomas.vondra              461 CBC        1290 :         AttrNumber  keyattno = key->sk_attno;
  747 tomas.vondra              462 ECB             : 
                                463                 :         /*
                                464                 :          * The collation of the scan key must match the collation used in the
                                465                 :          * index column (but only if the search is not IS NULL/ IS NOT NULL).
                                466                 :          * Otherwise we shouldn't be using this index ...
                                467                 :          */
  747 tomas.vondra              468 GIC        1290 :         Assert((key->sk_flags & SK_ISNULL) ||
  747 tomas.vondra              469 ECB             :                (key->sk_collation ==
                                470                 :                 TupleDescAttr(bdesc->bd_tupdesc,
                                471                 :                               keyattno - 1)->attcollation));
                                472                 : 
                                473                 :         /*
                                474                 :          * First time we see this index attribute, so init as needed.
                                475                 :          *
                                476                 :          * This is a bit of an overkill - we don't know how many scan keys are
                                477                 :          * there for this attribute, so we simply allocate the largest number
                                478                 :          * possible (as if all keys were for this attribute). This may waste a
                                479                 :          * bit of memory, but we only expect small number of scan keys in
                                480                 :          * general, so this should be negligible, and repeated repalloc calls
                                481                 :          * are not free either.
                                482                 :          */
  747 tomas.vondra              483 GIC        1290 :         if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
  747 tomas.vondra              484 ECB             :         {
                                485                 :             FmgrInfo   *tmp;
                                486                 : 
                                487                 :             /* First time we see this attribute, so no key/null keys. */
  747 tomas.vondra              488 GIC        1290 :             Assert(nkeys[keyattno - 1] == 0);
  747 tomas.vondra              489 CBC        1290 :             Assert(nnullkeys[keyattno - 1] == 0);
  747 tomas.vondra              490 ECB             : 
  747 tomas.vondra              491 GIC        1290 :             tmp = index_getprocinfo(idxRel, keyattno,
  747 tomas.vondra              492 ECB             :                                     BRIN_PROCNUM_CONSISTENT);
  747 tomas.vondra              493 GIC        1290 :             fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
  747 tomas.vondra              494 ECB             :                            CurrentMemoryContext);
                                495                 :         }
                                496                 : 
                                497                 :         /* Add key to the proper per-attribute array. */
  747 tomas.vondra              498 GIC        1290 :         if (key->sk_flags & SK_ISNULL)
  747 tomas.vondra              499 ECB             :         {
  747 tomas.vondra              500 GIC          18 :             nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
  747 tomas.vondra              501 CBC          18 :             nnullkeys[keyattno - 1]++;
  747 tomas.vondra              502 ECB             :         }
                                503                 :         else
                                504                 :         {
  747 tomas.vondra              505 GIC        1272 :             keys[keyattno - 1][nkeys[keyattno - 1]] = key;
  747 tomas.vondra              506 CBC        1272 :             nkeys[keyattno - 1]++;
  747 tomas.vondra              507 ECB             :         }
                                508                 :     }
                                509                 : 
                                510                 :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
 2193 alvherre                  511 GIC        1290 :     dtup = brin_new_memtuple(bdesc);
 2193 alvherre                  512 ECB             : 
                                513                 :     /*
                                514                 :      * Setup and use a per-range memory context, which is reset every time we
                                515                 :      * loop below.  This avoids having to free the tuples within the loop.
                                516                 :      */
 3075 alvherre                  517 GIC        1290 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
 3075 alvherre                  518 ECB             :                                         "bringetbitmap cxt",
                                519                 :                                         ALLOCSET_DEFAULT_SIZES);
 3075 alvherre                  520 GIC        1290 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
 3075 alvherre                  521 ECB             : 
                                522                 :     /*
                                523                 :      * Now scan the revmap.  We start by querying for heap page 0,
                                524                 :      * incrementing by the number of pages per range; this gives us a full
                                525                 :      * view of the table.
                                526                 :      */
 3075 alvherre                  527 GIC       95217 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
 3075 alvherre                  528 ECB             :     {
                                529                 :         bool        addrange;
 2193 alvherre                  530 GIC       93927 :         bool        gottuple = false;
 3075 alvherre                  531 ECB             :         BrinTuple  *tup;
                                532                 :         OffsetNumber off;
                                533                 :         Size        size;
                                534                 : 
 3075 alvherre                  535 GIC       93927 :         CHECK_FOR_INTERRUPTS();
 3075 alvherre                  536 ECB             : 
 3075 alvherre                  537 GIC       93927 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
 3075 alvherre                  538 ECB             : 
 3075 alvherre                  539 GIC       93927 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
 2557 kgrittn                   540 ECB             :                                        &off, &size, BUFFER_LOCK_SHARE,
                                541                 :                                        scan->xs_snapshot);
 3075 alvherre                  542 GIC       93927 :         if (tup)
 3075 alvherre                  543 ECB             :         {
 2193 alvherre                  544 GIC       93927 :             gottuple = true;
 2193 alvherre                  545 CBC       93927 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
 3075                           546           93927 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 3075 alvherre                  547 ECB             :         }
                                548                 : 
                                549                 :         /*
                                550                 :          * For page ranges with no indexed tuple, we must return the whole
                                551                 :          * range; otherwise, compare it to the scan keys.
                                552                 :          */
 2193 alvherre                  553 GIC       93927 :         if (!gottuple)
 3075 alvherre                  554 ECB             :         {
 3075 alvherre                  555 UIC           0 :             addrange = true;
 3075 alvherre                  556 EUB             :         }
                                557                 :         else
                                558                 :         {
 2193 alvherre                  559 GIC       93927 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
 3075 alvherre                  560 CBC       93927 :             if (dtup->bt_placeholder)
 3075 alvherre                  561 ECB             :             {
                                562                 :                 /*
                                563                 :                  * Placeholder tuples are always returned, regardless of the
                                564                 :                  * values stored in them.
                                565                 :                  */
 3075 alvherre                  566 UIC           0 :                 addrange = true;
 3075 alvherre                  567 EUB             :             }
                                568                 :             else
                                569                 :             {
                                570                 :                 int         attno;
                                571                 : 
                                572                 :                 /*
                                573                 :                  * Compare scan keys with summary values stored for the range.
                                574                 :                  * If scan keys are matched, the page range must be added to
                                575                 :                  * the bitmap.  We initially assume the range needs to be
                                576                 :                  * added; in particular this serves the case where there are
                                577                 :                  * no keys.
                                578                 :                  */
 3075 alvherre                  579 GIC       93927 :                 addrange = true;
  747 tomas.vondra              580 CBC     2350500 :                 for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
 3075 alvherre                  581 ECB             :                 {
                                582                 :                     BrinValues *bval;
                                583                 :                     Datum       add;
                                584                 :                     Oid         collation;
                                585                 : 
                                586                 :                     /*
                                587                 :                      * skip attributes without any scan keys (both regular and
                                588                 :                      * IS [NOT] NULL)
                                589                 :                      */
  747 tomas.vondra              590 GIC     2282826 :                     if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
  747 tomas.vondra              591 CBC     2188899 :                         continue;
  747 tomas.vondra              592 ECB             : 
  747 tomas.vondra              593 GIC       93927 :                     bval = &dtup->bt_columns[attno - 1];
  747 tomas.vondra              594 ECB             : 
                                595                 :                     /*
                                596                 :                      * First check if there are any IS [NOT] NULL scan keys,
                                597                 :                      * and if we're violating them. In that case we can
                                598                 :                      * terminate early, without invoking the support function.
                                599                 :                      *
                                600                 :                      * As there may be more keys, we can only determine
                                601                 :                      * mismatch within this loop.
                                602                 :                      */
  747 tomas.vondra              603 GIC       93927 :                     if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
  747 tomas.vondra              604 CBC       93927 :                         !check_null_keys(bval, nullkeys[attno - 1],
                                605           93927 :                                          nnullkeys[attno - 1]))
  747 tomas.vondra              606 ECB             :                     {
                                607                 :                         /*
                                608                 :                          * If any of the IS [NOT] NULL keys failed, the page
                                609                 :                          * range as a whole can't pass. So terminate the loop.
                                610                 :                          */
  747 tomas.vondra              611 GIC         498 :                         addrange = false;
  747 tomas.vondra              612 CBC         498 :                         break;
  747 tomas.vondra              613 ECB             :                     }
                                614                 : 
                                615                 :                     /*
                                616                 :                      * So either there are no IS [NOT] NULL keys, or all
                                617                 :                      * passed. If there are no regular scan keys, we're done -
                                618                 :                      * the page range matches. If there are regular keys, but
                                619                 :                      * the page range is marked as 'all nulls' it can't
                                620                 :                      * possibly pass (we're assuming the operators are
                                621                 :                      * strict).
                                622                 :                      */
                                623                 : 
                                624                 :                     /* No regular scan keys - page range as a whole passes. */
  747 tomas.vondra              625 GIC       93429 :                     if (!nkeys[attno - 1])
  747 tomas.vondra              626 CBC         618 :                         continue;
  747 tomas.vondra              627 ECB             : 
  747 tomas.vondra              628 GIC       92811 :                     Assert((nkeys[attno - 1] > 0) &&
  747 tomas.vondra              629 ECB             :                            (nkeys[attno - 1] <= scan->numberOfKeys));
                                630                 : 
                                631                 :                     /* If it is all nulls, it cannot possibly be consistent. */
  747 tomas.vondra              632 GIC       92811 :                     if (bval->bv_allnulls)
  747 tomas.vondra              633 ECB             :                     {
  747 tomas.vondra              634 GIC         189 :                         addrange = false;
  747 tomas.vondra              635 CBC         189 :                         break;
  747 tomas.vondra              636 ECB             :                     }
                                637                 : 
                                638                 :                     /*
                                639                 :                      * Collation from the first key (has to be the same for
                                640                 :                      * all keys for the same attribute).
                                641                 :                      */
  744 tomas.vondra              642 GIC       92622 :                     collation = keys[attno - 1][0]->sk_collation;
  744 tomas.vondra              643 ECB             : 
                                644                 :                     /*
                                645                 :                      * Check whether the scan key is consistent with the page
                                646                 :                      * range values; if so, have the pages in the range added
                                647                 :                      * to the output bitmap.
                                648                 :                      *
                                649                 :                      * The opclass may or may not support processing of
                                650                 :                      * multiple scan keys. We can determine that based on the
                                651                 :                      * number of arguments - functions with extra parameter
                                652                 :                      * (number of scan keys) do support this, otherwise we
                                653                 :                      * have to simply pass the scan keys one by one.
                                654                 :                      */
  744 tomas.vondra              655 GIC       92622 :                     if (consistentFn[attno - 1].fn_nargs >= 4)
  744 tomas.vondra              656 ECB             :                     {
                                657                 :                         /* Check all keys at once */
  744 tomas.vondra              658 GIC       18756 :                         add = FunctionCall4Coll(&consistentFn[attno - 1],
  744 tomas.vondra              659 ECB             :                                                 collation,
                                660                 :                                                 PointerGetDatum(bdesc),
                                661                 :                                                 PointerGetDatum(bval),
  744 tomas.vondra              662 GIC       18756 :                                                 PointerGetDatum(keys[attno - 1]),
  744 tomas.vondra              663 CBC       18756 :                                                 Int32GetDatum(nkeys[attno - 1]));
                                664           18756 :                         addrange = DatumGetBool(add);
  744 tomas.vondra              665 ECB             :                     }
                                666                 :                     else
                                667                 :                     {
                                668                 :                         /*
                                669                 :                          * Check keys one by one
                                670                 :                          *
                                671                 :                          * When there are multiple scan keys, failure to meet
                                672                 :                          * the criteria for a single one of them is enough to
                                673                 :                          * discard the range as a whole, so break out of the
                                674                 :                          * loop as soon as a false return value is obtained.
                                675                 :                          */
                                676                 :                         int         keyno;
                                677                 : 
  744 tomas.vondra              678 GIC      129039 :                         for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
  744 tomas.vondra              679 ECB             :                         {
  744 tomas.vondra              680 GIC       73866 :                             add = FunctionCall3Coll(&consistentFn[attno - 1],
  744 tomas.vondra              681 CBC       73866 :                                                     keys[attno - 1][keyno]->sk_collation,
  744 tomas.vondra              682 ECB             :                                                     PointerGetDatum(bdesc),
                                683                 :                                                     PointerGetDatum(bval),
  744 tomas.vondra              684 GIC       73866 :                                                     PointerGetDatum(keys[attno - 1][keyno]));
  744 tomas.vondra              685 CBC       73866 :                             addrange = DatumGetBool(add);
                                686           73866 :                             if (!addrange)
                                687           18693 :                                 break;
  744 tomas.vondra              688 ECB             :                         }
                                689                 :                     }
                                690                 : 
                                691                 :                     /*
                                692                 :                      * If we found a scan key eliminating the range, no need to
                                693                 :                      * check additional ones.
                                694                 :                      */
   49 tomas.vondra              695 GIC       92622 :                     if (!addrange)
   49 tomas.vondra              696 CBC       25566 :                         break;
 3075 alvherre                  697 ECB             :                 }
                                698                 :             }
                                699                 :         }
                                700                 : 
                                701                 :         /* add the pages in the range to the output bitmap, if needed */
 3075 alvherre                  702 GIC       93927 :         if (addrange)
 3075 alvherre                  703 ECB             :         {
                                704                 :             BlockNumber pageno;
                                705                 : 
 3075 alvherre                  706 GIC       67674 :             for (pageno = heapBlk;
  732 tomas.vondra              707 CBC      135348 :                  pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
 3075 alvherre                  708           67674 :                  pageno++)
 3075 alvherre                  709 ECB             :             {
 3075 alvherre                  710 GIC       67674 :                 MemoryContextSwitchTo(oldcxt);
 3075 alvherre                  711 CBC       67674 :                 tbm_add_page(tbm, pageno);
                                712           67674 :                 totalpages++;
                                713           67674 :                 MemoryContextSwitchTo(perRangeCxt);
 3075 alvherre                  714 ECB             :             }
                                715                 :         }
                                716                 :     }
                                717                 : 
 3075 alvherre                  718 GIC        1290 :     MemoryContextSwitchTo(oldcxt);
 3075 alvherre                  719 CBC        1290 :     MemoryContextDelete(perRangeCxt);
 3075 alvherre                  720 ECB             : 
 3075 alvherre                  721 GIC        1290 :     if (buf != InvalidBuffer)
 3075 alvherre                  722 CBC        1290 :         ReleaseBuffer(buf);
 3075 alvherre                  723 ECB             : 
                                724                 :     /*
                                725                 :      * XXX We have an approximation of the number of *pages* that our scan
                                726                 :      * returns, but we don't have a precise idea of the number of heap tuples
                                727                 :      * involved.
                                728                 :      */
 2639 tgl                       729 GIC        1290 :     return totalpages * 10;
 3075 alvherre                  730 ECB             : }
                                731                 : 
                                732                 : /*
                                733                 :  * Re-initialize state for a BRIN index scan
                                734                 :  */
                                735                 : void
 2639 tgl                       736 GIC        1290 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 2639 tgl                       737 ECB             :            ScanKey orderbys, int norderbys)
                                738                 : {
                                739                 :     /*
                                740                 :      * Other index AMs preprocess the scan keys at this point, or sometime
                                741                 :      * early during the scan; this lets them optimize by removing redundant
                                742                 :      * keys, or doing early returns when they are impossible to satisfy; see
                                743                 :      * _bt_preprocess_keys for an example.  Something like that could be added
                                744                 :      * here someday, too.
                                745                 :      */
                                746                 : 
 3075 alvherre                  747 GIC        1290 :     if (scankey && scan->numberOfKeys > 0)
 3075 alvherre                  748 CBC        1290 :         memmove(scan->keyData, scankey,
                                749            1290 :                 scan->numberOfKeys * sizeof(ScanKeyData));
                                750            1290 : }
 3075 alvherre                  751 ECB             : 
                                752                 : /*
                                753                 :  * Close down a BRIN index scan
                                754                 :  */
                                755                 : void
 2639 tgl                       756 GIC        1290 : brinendscan(IndexScanDesc scan)
 3075 alvherre                  757 ECB             : {
 3075 alvherre                  758 GIC        1290 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
 3075 alvherre                  759 ECB             : 
 3075 alvherre                  760 GIC        1290 :     brinRevmapTerminate(opaque->bo_rmAccess);
 3075 alvherre                  761 CBC        1290 :     brin_free_desc(opaque->bo_bdesc);
                                762            1290 :     pfree(opaque);
                                763            1290 : }
 3075 alvherre                  764 ECB             : 
                                765                 : /*
                                766                 :  * Per-heap-tuple callback for table_index_build_scan.
                                767                 :  *
                                768                 :  * Note we don't worry about the page range at the end of the table here; it is
                                769                 :  * present in the build state struct after we're called the last time, but not
                                770                 :  * inserted into the index.  Caller must ensure to do so, if appropriate.
                                771                 :  */
                                772                 : static void
 3075 alvherre                  773 GIC      346317 : brinbuildCallback(Relation index,
 1248 andres                    774 ECB             :                   ItemPointer tid,
                                775                 :                   Datum *values,
                                776                 :                   bool *isnull,
                                777                 :                   bool tupleIsAlive,
                                778                 :                   void *brstate)
                                779                 : {
 3075 alvherre                  780 GIC      346317 :     BrinBuildState *state = (BrinBuildState *) brstate;
 3075 alvherre                  781 ECB             :     BlockNumber thisblock;
                                782                 : 
 1248 andres                    783 GIC      346317 :     thisblock = ItemPointerGetBlockNumber(tid);
 3075 alvherre                  784 ECB             : 
                                785                 :     /*
                                786                 :      * If we're in a block that belongs to a future range, summarize what
                                787                 :      * we've got and start afresh.  Note the scan might have skipped many
                                788                 :      * pages, if they were devoid of live tuples; make sure to insert index
                                789                 :      * tuples for those too.
                                790                 :      */
 3075 alvherre                  791 GIC      347322 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
 3075 alvherre                  792 ECB             :     {
                                793                 : 
                                794                 :         BRIN_elog((DEBUG2,
                                795                 :                    "brinbuildCallback: completed a range: %u--%u",
                                796                 :                    state->bs_currRangeStart,
                                797                 :                    state->bs_currRangeStart + state->bs_pagesPerRange));
                                798                 : 
                                799                 :         /* create the index tuple and insert it */
 3075 alvherre                  800 GIC        1005 :         form_and_insert_tuple(state);
 3075 alvherre                  801 ECB             : 
                                802                 :         /* set state to correspond to the next range */
 3075 alvherre                  803 GIC        1005 :         state->bs_currRangeStart += state->bs_pagesPerRange;
 3075 alvherre                  804 ECB             : 
                                805                 :         /* re-initialize state for it */
 3075 alvherre                  806 GIC        1005 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
 3075 alvherre                  807 ECB             :     }
                                808                 : 
                                809                 :     /* Accumulate the current tuple into the running state */
  747 tomas.vondra              810 GIC      346317 :     (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
  747 tomas.vondra              811 ECB             :                                values, isnull);
 3075 alvherre                  812 GIC      346317 : }
 3075 alvherre                  813 ECB             : 
                                814                 : /*
                                815                 :  * brinbuild() -- build a new BRIN index.
                                816                 :  */
                                817                 : IndexBuildResult *
 2639 tgl                       818 GIC         119 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 3075 alvherre                  819 ECB             : {
                                820                 :     IndexBuildResult *result;
                                821                 :     double      reltuples;
                                822                 :     double      idxtuples;
                                823                 :     BrinRevmap *revmap;
                                824                 :     BrinBuildState *state;
                                825                 :     Buffer      meta;
                                826                 :     BlockNumber pagesPerRange;
                                827                 : 
                                828                 :     /*
                                829                 :      * We expect to be called exactly once for any index relation.
                                830                 :      */
 3075 alvherre                  831 GIC         119 :     if (RelationGetNumberOfBlocks(index) != 0)
 3075 alvherre                  832 LBC           0 :         elog(ERROR, "index \"%s\" already contains data",
 3075 alvherre                  833 EUB             :              RelationGetRelationName(index));
                                834                 : 
                                835                 :     /*
                                836                 :      * Critical section not required, because on error the creation of the
                                837                 :      * whole relation will be rolled back.
                                838                 :      */
                                839                 : 
    4 andres                    840 GNC         119 :     meta = ExtendBufferedRel(EB_REL(index), MAIN_FORKNUM, NULL,
                                841                 :                              EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
 3075 alvherre                  842 CBC         119 :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
 3075 alvherre                  843 ECB             : 
 2545 kgrittn                   844 GIC         119 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
 3075 alvherre                  845 ECB             :                        BRIN_CURRENT_VERSION);
 3075 alvherre                  846 GIC         119 :     MarkBufferDirty(meta);
 3075 alvherre                  847 ECB             : 
 3075 alvherre                  848 GIC         119 :     if (RelationNeedsWAL(index))
 3075 alvherre                  849 ECB             :     {
                                850                 :         xl_brin_createidx xlrec;
                                851                 :         XLogRecPtr  recptr;
                                852                 :         Page        page;
                                853                 : 
 3075 alvherre                  854 GIC          51 :         xlrec.version = BRIN_CURRENT_VERSION;
 3075 alvherre                  855 CBC          51 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
 3075 alvherre                  856 ECB             : 
 3062 heikki.linnakangas        857 GIC          51 :         XLogBeginInsert();
 3062 heikki.linnakangas        858 CBC          51 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
 1984 tgl                       859              51 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
 3075 alvherre                  860 ECB             : 
 3062 heikki.linnakangas        861 GIC          51 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
 3075 alvherre                  862 ECB             : 
 2545 kgrittn                   863 GIC          51 :         page = BufferGetPage(meta);
 3075 alvherre                  864 CBC          51 :         PageSetLSN(page, recptr);
 3075 alvherre                  865 ECB             :     }
                                866                 : 
 3075 alvherre                  867 GIC         119 :     UnlockReleaseBuffer(meta);
 3075 alvherre                  868 ECB             : 
                                869                 :     /*
                                870                 :      * Initialize our state, including the deformed tuple state.
                                871                 :      */
 2557 kgrittn                   872 GIC         119 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 3075 alvherre                  873 CBC         119 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 3075 alvherre                  874 ECB             : 
                                875                 :     /*
                                876                 :      * Now scan the relation.  No syncscan allowed here because we want the
                                877                 :      * heap blocks in physical order.
                                878                 :      */
 1468 alvherre                  879 GIC         119 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
 1474 andres                    880 ECB             :                                        brinbuildCallback, (void *) state, NULL);
                                881                 : 
                                882                 :     /* process the final batch */
 3075 alvherre                  883 GIC         119 :     form_and_insert_tuple(state);
 3075 alvherre                  884 ECB             : 
                                885                 :     /* release resources */
 3075 alvherre                  886 GIC         119 :     idxtuples = state->bs_numtuples;
 3075 alvherre                  887 CBC         119 :     brinRevmapTerminate(state->bs_rmAccess);
                                888             119 :     terminate_brin_buildstate(state);
 3075 alvherre                  889 ECB             : 
                                890                 :     /*
                                891                 :      * Return statistics
                                892                 :      */
  209 peter                     893 GNC         119 :     result = palloc_object(IndexBuildResult);
 3075 alvherre                  894 ECB             : 
 3075 alvherre                  895 GIC         119 :     result->heap_tuples = reltuples;
 3075 alvherre                  896 CBC         119 :     result->index_tuples = idxtuples;
 3075 alvherre                  897 ECB             : 
 2639 tgl                       898 GIC         119 :     return result;
 3075 alvherre                  899 ECB             : }
                                900                 : 
                                901                 : void
 2639 tgl                       902 GIC           3 : brinbuildempty(Relation index)
 3075 alvherre                  903 ECB             : {
                                904                 :     Buffer      metabuf;
                                905                 : 
                                906                 :     /* An empty BRIN index has a metapage only. */
    4 andres                    907 GNC           3 :     metabuf = ExtendBufferedRel(EB_REL(index), INIT_FORKNUM, NULL,
                                908                 :                                 EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
                                909                 : 
                                910                 :     /* Initialize and xlog metabuffer. */
 3075 alvherre                  911 CBC           3 :     START_CRIT_SECTION();
 2545 kgrittn                   912               3 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
                                913                 :                        BRIN_CURRENT_VERSION);
 3075 alvherre                  914               3 :     MarkBufferDirty(metabuf);
 1984 tgl                       915               3 :     log_newpage_buffer(metabuf, true);
 3075 alvherre                  916               3 :     END_CRIT_SECTION();
                                917                 : 
                                918               3 :     UnlockReleaseBuffer(metabuf);
                                919               3 : }
                                920                 : 
                                921                 : /*
                                922                 :  * brinbulkdelete
                                923                 :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
                                924                 :  *      there's not a lot we can do here.
                                925                 :  *
                                926                 :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
                                927                 :  * tuple is deleted), meaning the need to re-run summarization on the affected
                                928                 :  * range.  Would need to add an extra flag in brintuples for that.
                                929                 :  */
                                930                 : IndexBulkDeleteResult *
 2639 tgl                       931               8 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
                                932                 :                IndexBulkDeleteCallback callback, void *callback_state)
                                933                 : {
                                934                 :     /* allocate stats if first time through, else re-use existing struct */
 3075 alvherre                  935               8 :     if (stats == NULL)
  209 peter                     936 GNC           8 :         stats = palloc0_object(IndexBulkDeleteResult);
                                937                 : 
 2639 tgl                       938 CBC           8 :     return stats;
                                939                 : }
                                940                 : 
                                941                 : /*
                                942                 :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
                                943                 :  * ranges that are currently unsummarized.
                                944                 :  */
                                945                 : IndexBulkDeleteResult *
                                946              43 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
                                947                 : {
                                948                 :     Relation    heapRel;
                                949                 : 
                                950                 :     /* No-op in ANALYZE ONLY mode */
 3075 alvherre                  951              43 :     if (info->analyze_only)
 2639 tgl                       952               1 :         return stats;
                                953                 : 
 3075 alvherre                  954              42 :     if (!stats)
  209 peter                     955 GNC          34 :         stats = palloc0_object(IndexBulkDeleteResult);
 3075 alvherre                  956 CBC          42 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
                                957                 :     /* rest of stats is initialized by zeroing */
                                958                 : 
 1539 andres                    959              42 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
                                960                 :                          AccessShareLock);
                                961                 : 
 2797 alvherre                  962              42 :     brin_vacuum_scan(info->index, info->strategy);
                                963                 : 
 1983                           964              42 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
                                965                 :                   &stats->num_index_tuples, &stats->num_index_tuples);
                                966                 : 
 1539 andres                    967              42 :     table_close(heapRel, AccessShareLock);
                                968                 : 
 2639 tgl                       969              42 :     return stats;
                                970                 : }
                                971                 : 
                                972                 : /*
                                973                 :  * reloptions processor for BRIN indexes
                                974                 :  */
                                975                 : bytea *
                                976             264 : brinoptions(Datum reloptions, bool validate)
                                977                 : {
                                978                 :     static const relopt_parse_elt tab[] = {
                                979                 :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
                                980                 :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
                                981                 :     };
                                982                 : 
 1251 michael                   983             264 :     return (bytea *) build_reloptions(reloptions, validate,
                                984                 :                                       RELOPT_KIND_BRIN,
                                985                 :                                       sizeof(BrinOptions),
                                986                 :                                       tab, lengthof(tab));
                                987                 : }
                                988                 : 
                                989                 : /*
                                990                 :  * SQL-callable function to scan through an index and summarize all ranges
                                991                 :  * that are not currently summarized.
                                992                 :  */
                                993                 : Datum
 3075 alvherre                  994              38 : brin_summarize_new_values(PG_FUNCTION_ARGS)
                                995                 : {
 2199                           996              38 :     Datum       relation = PG_GETARG_DATUM(0);
                                997                 : 
                                998              38 :     return DirectFunctionCall2(brin_summarize_range,
                                999                 :                                relation,
                               1000                 :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
                               1001                 : }
                               1002                 : 
                               1003                 : /*
                               1004                 :  * SQL-callable function to summarize the indicated page range, if not already
                               1005                 :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
                               1006                 :  * unsummarized ranges are summarized.
                               1007                 :  */
                               1008                 : Datum
                               1009             101 : brin_summarize_range(PG_FUNCTION_ARGS)
                               1010                 : {
 3075                          1011             101 :     Oid         indexoid = PG_GETARG_OID(0);
 2199                          1012             101 :     int64       heapBlk64 = PG_GETARG_INT64(1);
                               1013                 :     BlockNumber heapBlk;
                               1014                 :     Oid         heapoid;
                               1015                 :     Relation    indexRel;
                               1016                 :     Relation    heapRel;
                               1017                 :     Oid         save_userid;
                               1018                 :     int         save_sec_context;
                               1019                 :     int         save_nestlevel;
 3075                          1020             101 :     double      numSummarized = 0;
                               1021                 : 
 1760                          1022             101 :     if (RecoveryInProgress())
 1760 alvherre                 1023 UBC           0 :         ereport(ERROR,
                               1024                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1025                 :                  errmsg("recovery is in progress"),
                               1026                 :                  errhint("BRIN control functions cannot be executed during recovery.")));
                               1027                 : 
 2199 alvherre                 1028 CBC         101 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
                               1029              18 :         ereport(ERROR,
                               1030                 :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                               1031                 :                  errmsg("block number out of range: %lld",
                               1032                 :                         (long long) heapBlk64)));
                               1033              83 :     heapBlk = (BlockNumber) heapBlk64;
                               1034                 : 
                               1035                 :     /*
                               1036                 :      * We must lock table before index to avoid deadlocks.  However, if the
                               1037                 :      * passed indexoid isn't an index then IndexGetRelation() will fail.
                               1038                 :      * Rather than emitting a not-very-helpful error message, postpone
                               1039                 :      * complaining, expecting that the is-it-an-index test below will fail.
                               1040                 :      */
 2661 tgl                      1041              83 :     heapoid = IndexGetRelation(indexoid, true);
                               1042              83 :     if (OidIsValid(heapoid))
                               1043                 :     {
 1539 andres                   1044              74 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
                               1045                 : 
                               1046                 :         /*
                               1047                 :          * Autovacuum calls us.  For its benefit, switch to the table owner's
                               1048                 :          * userid, so that any index functions are run as that user.  Also
                               1049                 :          * lock down security-restricted operations and arrange to make GUC
                               1050                 :          * variable changes local to this command.  This is harmless, albeit
                               1051                 :          * unnecessary, when called from SQL, because we fail shortly if the
                               1052                 :          * user does not own the index.
                               1053                 :          */
  335 noah                     1054              74 :         GetUserIdAndSecContext(&save_userid, &save_sec_context);
                               1055              74 :         SetUserIdAndSecContext(heapRel->rd_rel->relowner,
                               1056                 :                                save_sec_context | SECURITY_RESTRICTED_OPERATION);
                               1057              74 :         save_nestlevel = NewGUCNestLevel();
                               1058                 :     }
                               1059                 :     else
                               1060                 :     {
 2661 tgl                      1061               9 :         heapRel = NULL;
                               1062                 :         /* Set these just to suppress "uninitialized variable" warnings */
  312                          1063               9 :         save_userid = InvalidOid;
                               1064               9 :         save_sec_context = -1;
                               1065               9 :         save_nestlevel = -1;
                               1066                 :     }
                               1067                 : 
 3075 alvherre                 1068              83 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
                               1069                 : 
                               1070                 :     /* Must be a BRIN index */
 2661 tgl                      1071              74 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
                               1072              74 :         indexRel->rd_rel->relam != BRIN_AM_OID)
                               1073               9 :         ereport(ERROR,
                               1074                 :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                               1075                 :                  errmsg("\"%s\" is not a BRIN index",
                               1076                 :                         RelationGetRelationName(indexRel))));
                               1077                 : 
                               1078                 :     /* User must own the index (comparable to privileges needed for VACUUM) */
  147 peter                    1079 GNC          65 :     if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
 1954 peter_e                  1080 UBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
 2661 tgl                      1081               0 :                        RelationGetRelationName(indexRel));
                               1082                 : 
                               1083                 :     /*
                               1084                 :      * Since we did the IndexGetRelation call above without any lock, it's
                               1085                 :      * barely possible that a race against an index drop/recreation could have
                               1086                 :      * netted us the wrong table.  Recheck.
                               1087                 :      */
 2661 tgl                      1088 CBC          65 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
 2661 tgl                      1089 UBC           0 :         ereport(ERROR,
                               1090                 :                 (errcode(ERRCODE_UNDEFINED_TABLE),
                               1091                 :                  errmsg("could not open parent table of index \"%s\"",
                               1092                 :                         RelationGetRelationName(indexRel))));
                               1093                 : 
                               1094                 :     /* OK, do it */
 1983 alvherre                 1095 CBC          65 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
                               1096                 : 
                               1097                 :     /* Roll back any GUC changes executed by index functions */
  335 noah                     1098              65 :     AtEOXact_GUC(false, save_nestlevel);
                               1099                 : 
                               1100                 :     /* Restore userid and security context */
                               1101              65 :     SetUserIdAndSecContext(save_userid, save_sec_context);
                               1102                 : 
 3075 alvherre                 1103              65 :     relation_close(indexRel, ShareUpdateExclusiveLock);
                               1104              65 :     relation_close(heapRel, ShareUpdateExclusiveLock);
                               1105                 : 
                               1106              65 :     PG_RETURN_INT32((int32) numSummarized);
                               1107                 : }
                               1108                 : 
                               1109                 : /*
                               1110                 :  * SQL-callable interface to mark a range as no longer summarized
                               1111                 :  */
                               1112                 : Datum
 2199                          1113              51 : brin_desummarize_range(PG_FUNCTION_ARGS)
                               1114                 : {
 2153 bruce                    1115              51 :     Oid         indexoid = PG_GETARG_OID(0);
                               1116              51 :     int64       heapBlk64 = PG_GETARG_INT64(1);
                               1117                 :     BlockNumber heapBlk;
                               1118                 :     Oid         heapoid;
                               1119                 :     Relation    heapRel;
                               1120                 :     Relation    indexRel;
                               1121                 :     bool        done;
                               1122                 : 
 1760 alvherre                 1123              51 :     if (RecoveryInProgress())
 1760 alvherre                 1124 UBC           0 :         ereport(ERROR,
                               1125                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1126                 :                  errmsg("recovery is in progress"),
                               1127                 :                  errhint("BRIN control functions cannot be executed during recovery.")));
                               1128                 : 
 2199 alvherre                 1129 CBC          51 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
                               1130               9 :         ereport(ERROR,
                               1131                 :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                               1132                 :                  errmsg("block number out of range: %lld",
                               1133                 :                         (long long) heapBlk64)));
                               1134              42 :     heapBlk = (BlockNumber) heapBlk64;
                               1135                 : 
                               1136                 :     /*
                               1137                 :      * We must lock table before index to avoid deadlocks.  However, if the
                               1138                 :      * passed indexoid isn't an index then IndexGetRelation() will fail.
                               1139                 :      * Rather than emitting a not-very-helpful error message, postpone
                               1140                 :      * complaining, expecting that the is-it-an-index test below will fail.
                               1141                 :      *
                               1142                 :      * Unlike brin_summarize_range(), autovacuum never calls this.  Hence, we
                               1143                 :      * don't switch userid.
                               1144                 :      */
                               1145              42 :     heapoid = IndexGetRelation(indexoid, true);
                               1146              42 :     if (OidIsValid(heapoid))
 1539 andres                   1147              42 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
                               1148                 :     else
 2199 alvherre                 1149 UBC           0 :         heapRel = NULL;
                               1150                 : 
 2199 alvherre                 1151 CBC          42 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
                               1152                 : 
                               1153                 :     /* Must be a BRIN index */
                               1154              42 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
                               1155              42 :         indexRel->rd_rel->relam != BRIN_AM_OID)
 2199 alvherre                 1156 UBC           0 :         ereport(ERROR,
                               1157                 :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                               1158                 :                  errmsg("\"%s\" is not a BRIN index",
                               1159                 :                         RelationGetRelationName(indexRel))));
                               1160                 : 
                               1161                 :     /* User must own the index (comparable to privileges needed for VACUUM) */
  147 peter                    1162 GNC          42 :     if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
 1954 peter_e                  1163 UBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
 2199 alvherre                 1164               0 :                        RelationGetRelationName(indexRel));
                               1165                 : 
                               1166                 :     /*
                               1167                 :      * Since we did the IndexGetRelation call above without any lock, it's
                               1168                 :      * barely possible that a race against an index drop/recreation could have
                               1169                 :      * netted us the wrong table.  Recheck.
                               1170                 :      */
 2199 alvherre                 1171 CBC          42 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
 2199 alvherre                 1172 UBC           0 :         ereport(ERROR,
                               1173                 :                 (errcode(ERRCODE_UNDEFINED_TABLE),
                               1174                 :                  errmsg("could not open parent table of index \"%s\"",
                               1175                 :                         RelationGetRelationName(indexRel))));
                               1176                 : 
                               1177                 :     /* the revmap does the hard work */
                               1178                 :     do
                               1179                 :     {
 2199 alvherre                 1180 CBC          42 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
                               1181                 :     }
                               1182              42 :     while (!done);
                               1183                 : 
                               1184              42 :     relation_close(indexRel, ShareUpdateExclusiveLock);
                               1185              42 :     relation_close(heapRel, ShareUpdateExclusiveLock);
                               1186                 : 
                               1187              42 :     PG_RETURN_VOID();
                               1188                 : }
                               1189                 : 
                               1190                 : /*
                               1191                 :  * Build a BrinDesc used to create or scan a BRIN index
                               1192                 :  */
                               1193                 : BrinDesc *
 3075                          1194            1968 : brin_build_desc(Relation rel)
                               1195                 : {
                               1196                 :     BrinOpcInfo **opcinfo;
                               1197                 :     BrinDesc   *bdesc;
                               1198                 :     TupleDesc   tupdesc;
                               1199            1968 :     int         totalstored = 0;
                               1200                 :     int         keyno;
                               1201                 :     long        totalsize;
                               1202                 :     MemoryContext cxt;
                               1203                 :     MemoryContext oldcxt;
                               1204                 : 
                               1205            1968 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
                               1206                 :                                 "brin desc cxt",
                               1207                 :                                 ALLOCSET_SMALL_SIZES);
                               1208            1968 :     oldcxt = MemoryContextSwitchTo(cxt);
                               1209            1968 :     tupdesc = RelationGetDescr(rel);
                               1210                 : 
                               1211                 :     /*
                               1212                 :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
                               1213                 :      * the number of columns stored, since the number is opclass-defined.
                               1214                 :      */
  209 peter                    1215 GNC        1968 :     opcinfo = palloc_array(BrinOpcInfo*, tupdesc->natts);
 3075 alvherre                 1216 CBC       37096 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
                               1217                 :     {
                               1218                 :         FmgrInfo   *opcInfoFn;
 2058 andres                   1219           35128 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
                               1220                 : 
 3075 alvherre                 1221           35128 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
                               1222                 : 
                               1223           70256 :         opcinfo[keyno] = (BrinOpcInfo *)
 2058 andres                   1224           35128 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
 3075 alvherre                 1225           35128 :         totalstored += opcinfo[keyno]->oi_nstored;
                               1226                 :     }
                               1227                 : 
                               1228                 :     /* Allocate our result struct and fill it in */
                               1229            1968 :     totalsize = offsetof(BrinDesc, bd_info) +
                               1230            1968 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
                               1231                 : 
                               1232            1968 :     bdesc = palloc(totalsize);
                               1233            1968 :     bdesc->bd_context = cxt;
                               1234            1968 :     bdesc->bd_index = rel;
                               1235            1968 :     bdesc->bd_tupdesc = tupdesc;
                               1236            1968 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
                               1237            1968 :     bdesc->bd_totalstored = totalstored;
                               1238                 : 
                               1239           37096 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
                               1240           35128 :         bdesc->bd_info[keyno] = opcinfo[keyno];
                               1241            1968 :     pfree(opcinfo);
                               1242                 : 
                               1243            1968 :     MemoryContextSwitchTo(oldcxt);
                               1244                 : 
                               1245            1968 :     return bdesc;
                               1246                 : }
                               1247                 : 
                               1248                 : void
                               1249            1458 : brin_free_desc(BrinDesc *bdesc)
                               1250                 : {
                               1251                 :     /* make sure the tupdesc is still valid */
                               1252            1458 :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
                               1253                 :     /* no need for retail pfree */
                               1254            1458 :     MemoryContextDelete(bdesc->bd_context);
                               1255            1458 : }
                               1256                 : 
                               1257                 : /*
                               1258                 :  * Fetch index's statistical data into *stats
                               1259                 :  */
                               1260                 : void
 2194                          1261            5178 : brinGetStats(Relation index, BrinStatsData *stats)
                               1262                 : {
                               1263                 :     Buffer      metabuffer;
                               1264                 :     Page        metapage;
                               1265                 :     BrinMetaPageData *metadata;
                               1266                 : 
                               1267            5178 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
                               1268            5178 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
                               1269            5178 :     metapage = BufferGetPage(metabuffer);
                               1270            5178 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
                               1271                 : 
                               1272            5178 :     stats->pagesPerRange = metadata->pagesPerRange;
                               1273            5178 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
                               1274                 : 
                               1275            5178 :     UnlockReleaseBuffer(metabuffer);
                               1276            5178 : }
                               1277                 : 
                               1278                 : /*
                               1279                 :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
                               1280                 :  */
                               1281                 : static BrinBuildState *
 3075                          1282             158 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
                               1283                 :                            BlockNumber pagesPerRange)
                               1284                 : {
                               1285                 :     BrinBuildState *state;
                               1286                 : 
  209 peter                    1287 GNC         158 :     state = palloc_object(BrinBuildState);
                               1288                 : 
 3075 alvherre                 1289 CBC         158 :     state->bs_irel = idxRel;
                               1290             158 :     state->bs_numtuples = 0;
                               1291             158 :     state->bs_currentInsertBuf = InvalidBuffer;
                               1292             158 :     state->bs_pagesPerRange = pagesPerRange;
                               1293             158 :     state->bs_currRangeStart = 0;
                               1294             158 :     state->bs_rmAccess = revmap;
                               1295             158 :     state->bs_bdesc = brin_build_desc(idxRel);
                               1296             158 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
                               1297                 : 
                               1298             158 :     return state;
                               1299                 : }
                               1300                 : 
                               1301                 : /*
                               1302                 :  * Release resources associated with a BrinBuildState.
                               1303                 :  */
                               1304                 : static void
                               1305             158 : terminate_brin_buildstate(BrinBuildState *state)
                               1306                 : {
                               1307                 :     /*
                               1308                 :      * Release the last index buffer used.  We might as well ensure that
                               1309                 :      * whatever free space remains in that page is available in FSM, too.
                               1310                 :      */
                               1311             158 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
                               1312                 :     {
                               1313                 :         Page        page;
                               1314                 :         Size        freespace;
                               1315                 :         BlockNumber blk;
                               1316                 : 
 2545 kgrittn                  1317             119 :         page = BufferGetPage(state->bs_currentInsertBuf);
 1831 tgl                      1318             119 :         freespace = PageGetFreeSpace(page);
                               1319             119 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
 3075 alvherre                 1320             119 :         ReleaseBuffer(state->bs_currentInsertBuf);
 1433 akapila                  1321             119 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
 1831 tgl                      1322             119 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
                               1323                 :     }
                               1324                 : 
 3075 alvherre                 1325             158 :     brin_free_desc(state->bs_bdesc);
                               1326             158 :     pfree(state->bs_dtuple);
                               1327             158 :     pfree(state);
                               1328             158 : }
                               1329                 : 
                               1330                 : /*
                               1331                 :  * On the given BRIN index, summarize the heap page range that corresponds
                               1332                 :  * to the heap block number given.
                               1333                 :  *
                               1334                 :  * This routine can run in parallel with insertions into the heap.  To avoid
                               1335                 :  * missing those values from the summary tuple, we first insert a placeholder
                               1336                 :  * index tuple into the index, then execute the heap scan; transactions
                               1337                 :  * concurrent with the scan update the placeholder tuple.  After the scan, we
                               1338                 :  * union the placeholder tuple with the one computed by this routine.  The
                               1339                 :  * update of the index value happens in a loop, so that if somebody updates
                               1340                 :  * the placeholder tuple after we read it, we detect the case and try again.
                               1341                 :  * This ensures that the concurrently inserted tuples are not lost.
                               1342                 :  *
                               1343                 :  * A further corner case is this routine being asked to summarize the partial
                               1344                 :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
                               1345                 :  * table size; if we notice that the requested range lies beyond that size,
                               1346                 :  * we re-compute the table size after inserting the placeholder tuple, to
                               1347                 :  * avoid missing pages that were appended recently.
                               1348                 :  */
                               1349                 : static void
                               1350            1467 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
                               1351                 :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
                               1352                 : {
                               1353                 :     Buffer      phbuf;
                               1354                 :     BrinTuple  *phtup;
                               1355                 :     Size        phsz;
                               1356                 :     OffsetNumber offset;
                               1357                 :     BlockNumber scanNumBlks;
                               1358                 : 
                               1359                 :     /*
                               1360                 :      * Insert the placeholder tuple
                               1361                 :      */
                               1362            1467 :     phbuf = InvalidBuffer;
                               1363            1467 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
                               1364            1467 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
                               1365                 :                            state->bs_rmAccess, &phbuf,
                               1366                 :                            heapBlk, phtup, phsz);
                               1367                 : 
                               1368                 :     /*
                               1369                 :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
                               1370                 :      * cannot shrink concurrently (but it can grow).
                               1371                 :      */
 1983                          1372            1467 :     Assert(heapBlk % state->bs_pagesPerRange == 0);
                               1373            1467 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
                               1374                 :     {
                               1375                 :         /*
                               1376                 :          * If we're asked to scan what we believe to be the final range on the
                               1377                 :          * table (i.e. a range that might be partial) we need to recompute our
                               1378                 :          * idea of what the latest page is after inserting the placeholder
                               1379                 :          * tuple.  Anyone that grows the table later will update the
                               1380                 :          * placeholder tuple, so it doesn't matter that we won't scan these
                               1381                 :          * pages ourselves.  Careful: the table might have been extended
                               1382                 :          * beyond the current range, so clamp our result.
                               1383                 :          *
                               1384                 :          * Fortunately, this should occur infrequently.
                               1385                 :          */
                               1386              12 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
                               1387                 :                           state->bs_pagesPerRange);
                               1388                 :     }
                               1389                 :     else
                               1390                 :     {
                               1391                 :         /* Easy case: range is known to be complete */
                               1392            1455 :         scanNumBlks = state->bs_pagesPerRange;
                               1393                 :     }
                               1394                 : 
                               1395                 :     /*
                               1396                 :      * Execute the partial heap scan covering the heap blocks in the specified
                               1397                 :      * page range, summarizing the heap tuples in it.  This scan stops just
                               1398                 :      * short of brinbuildCallback creating the new index entry.
                               1399                 :      *
                               1400                 :      * Note that it is critical we use the "any visible" mode of
                               1401                 :      * table_index_build_range_scan here: otherwise, we would miss tuples
                               1402                 :      * inserted by transactions that are still in progress, among other corner
                               1403                 :      * cases.
                               1404                 :      */
 3075                          1405            1467 :     state->bs_currRangeStart = heapBlk;
 1468                          1406            1467 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
                               1407                 :                                  heapBlk, scanNumBlks,
                               1408                 :                                  brinbuildCallback, (void *) state, NULL);
                               1409                 : 
                               1410                 :     /*
                               1411                 :      * Now we update the values obtained by the scan with the placeholder
                               1412                 :      * tuple.  We do this in a loop which only terminates if we're able to
                               1413                 :      * update the placeholder tuple successfully; if we are not, this means
                               1414                 :      * somebody else modified the placeholder tuple after we read it.
                               1415                 :      */
                               1416                 :     for (;;)
 3075 alvherre                 1417 UBC           0 :     {
                               1418                 :         BrinTuple  *newtup;
                               1419                 :         Size        newsize;
                               1420                 :         bool        didupdate;
                               1421                 :         bool        samepage;
                               1422                 : 
 3075 alvherre                 1423 CBC        1467 :         CHECK_FOR_INTERRUPTS();
                               1424                 : 
                               1425                 :         /*
                               1426                 :          * Update the summary tuple and try to update.
                               1427                 :          */
                               1428            1467 :         newtup = brin_form_tuple(state->bs_bdesc,
                               1429                 :                                  heapBlk, state->bs_dtuple, &newsize);
                               1430            1467 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
                               1431                 :         didupdate =
                               1432            1467 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
                               1433                 :                           state->bs_rmAccess, heapBlk, phbuf, offset,
                               1434                 :                           phtup, phsz, newtup, newsize, samepage);
                               1435            1467 :         brin_free_tuple(phtup);
                               1436            1467 :         brin_free_tuple(newtup);
                               1437                 : 
                               1438                 :         /* If the update succeeded, we're done. */
                               1439            1467 :         if (didupdate)
                               1440            1467 :             break;
                               1441                 : 
                               1442                 :         /*
                               1443                 :          * If the update didn't work, it might be because somebody updated the
                               1444                 :          * placeholder tuple concurrently.  Extract the new version, union it
                               1445                 :          * with the values we have from the scan, and start over.  (There are
                               1446                 :          * other reasons for the update to fail, but it's simple to treat them
                               1447                 :          * the same.)
                               1448                 :          */
 3075 alvherre                 1449 UBC           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
                               1450                 :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
                               1451                 :                                          NULL);
                               1452                 :         /* the placeholder tuple must exist */
                               1453               0 :         if (phtup == NULL)
                               1454               0 :             elog(ERROR, "missing placeholder tuple");
 2193                          1455               0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
 3075                          1456               0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
                               1457                 : 
                               1458                 :         /* merge it into the tuple from the heap scan */
                               1459               0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
                               1460                 :     }
                               1461                 : 
 3075 alvherre                 1462 CBC        1467 :     ReleaseBuffer(phbuf);
                               1463            1467 : }
                               1464                 : 
                               1465                 : /*
                               1466                 :  * Summarize page ranges that are not already summarized.  If pageRange is
                               1467                 :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
                               1468                 :  * page range containing the given heap page number is scanned.
                               1469                 :  * If include_partial is true, then the partial range at the end of the table
                               1470                 :  * is summarized, otherwise not.
                               1471                 :  *
                               1472                 :  * For each new index tuple inserted, *numSummarized (if not NULL) is
                               1473                 :  * incremented; for each existing tuple, *numExisting (if not NULL) is
                               1474                 :  * incremented.
                               1475                 :  */
                               1476                 : static void
 2199                          1477             107 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
                               1478                 :               bool include_partial, double *numSummarized, double *numExisting)
                               1479                 : {
                               1480                 :     BrinRevmap *revmap;
 3075                          1481             107 :     BrinBuildState *state = NULL;
                               1482             107 :     IndexInfo  *indexInfo = NULL;
                               1483                 :     BlockNumber heapNumBlocks;
                               1484                 :     BlockNumber pagesPerRange;
                               1485                 :     Buffer      buf;
                               1486                 :     BlockNumber startBlk;
                               1487                 : 
 2557 kgrittn                  1488             107 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
                               1489                 : 
                               1490                 :     /* determine range of pages to process */
 1983 alvherre                 1491             107 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
 2199                          1492             107 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
                               1493              71 :         startBlk = 0;
                               1494                 :     else
                               1495                 :     {
                               1496              36 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
 1983                          1497              36 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
                               1498                 :     }
                               1499             107 :     if (startBlk > heapNumBlocks)
                               1500                 :     {
                               1501                 :         /* Nothing to do if start point is beyond end of table */
 1983 alvherre                 1502 UBC           0 :         brinRevmapTerminate(revmap);
                               1503               0 :         return;
                               1504                 :     }
                               1505                 : 
                               1506                 :     /*
                               1507                 :      * Scan the revmap to find unsummarized items.
                               1508                 :      */
 3075 alvherre                 1509 CBC         107 :     buf = InvalidBuffer;
 1983                          1510            9469 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
                               1511                 :     {
                               1512                 :         BrinTuple  *tup;
                               1513                 :         OffsetNumber off;
                               1514                 : 
                               1515                 :         /*
                               1516                 :          * Unless requested to summarize even a partial range, go away now if
                               1517                 :          * we think the next range is partial.  Caller would pass true when it
                               1518                 :          * is typically run once bulk data loading is done
                               1519                 :          * (brin_summarize_new_values), and false when it is typically the
                               1520                 :          * result of arbitrarily-scheduled maintenance command (vacuuming).
                               1521                 :          */
                               1522            9394 :         if (!include_partial &&
                               1523            1024 :             (startBlk + pagesPerRange > heapNumBlocks))
                               1524              32 :             break;
                               1525                 : 
 3075                          1526            9362 :         CHECK_FOR_INTERRUPTS();
                               1527                 : 
 1983                          1528            9362 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
                               1529                 :                                        BUFFER_LOCK_SHARE, NULL);
 3075                          1530            9362 :         if (tup == NULL)
                               1531                 :         {
                               1532                 :             /* no revmap entry for this heap range. Summarize it. */
                               1533            1467 :             if (state == NULL)
                               1534                 :             {
                               1535                 :                 /* first time through */
                               1536              39 :                 Assert(!indexInfo);
                               1537              39 :                 state = initialize_brin_buildstate(index, revmap,
                               1538                 :                                                    pagesPerRange);
                               1539              39 :                 indexInfo = BuildIndexInfo(index);
                               1540                 :             }
 1983                          1541            1467 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
                               1542                 : 
                               1543                 :             /* and re-initialize state for the next range */
 3075                          1544            1467 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
                               1545                 : 
                               1546            1467 :             if (numSummarized)
                               1547            1467 :                 *numSummarized += 1.0;
                               1548                 :         }
                               1549                 :         else
                               1550                 :         {
                               1551            7895 :             if (numExisting)
                               1552             946 :                 *numExisting += 1.0;
                               1553            7895 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                               1554                 :         }
                               1555                 :     }
                               1556                 : 
                               1557             107 :     if (BufferIsValid(buf))
                               1558              75 :         ReleaseBuffer(buf);
                               1559                 : 
                               1560                 :     /* free resources */
                               1561             107 :     brinRevmapTerminate(revmap);
                               1562             107 :     if (state)
                               1563                 :     {
                               1564              39 :         terminate_brin_buildstate(state);
 2804                          1565              39 :         pfree(indexInfo);
                               1566                 :     }
                               1567                 : }
                               1568                 : 
                               1569                 : /*
                               1570                 :  * Given a deformed tuple in the build state, convert it into the on-disk
                               1571                 :  * format and insert it into the index, making the revmap point to it.
                               1572                 :  */
                               1573                 : static void
 3075                          1574            1124 : form_and_insert_tuple(BrinBuildState *state)
                               1575                 : {
                               1576                 :     BrinTuple  *tup;
                               1577                 :     Size        size;
                               1578                 : 
                               1579            1124 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
                               1580                 :                           state->bs_dtuple, &size);
                               1581            1124 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
                               1582                 :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
                               1583                 :                   tup, size);
                               1584            1124 :     state->bs_numtuples++;
                               1585                 : 
                               1586            1124 :     pfree(tup);
                               1587            1124 : }
                               1588                 : 
                               1589                 : /*
                               1590                 :  * Given two deformed tuples, adjust the first one so that it's consistent
                               1591                 :  * with the summary values in both.
                               1592                 :  */
                               1593                 : static void
 3075 alvherre                 1594 UBC           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
                               1595                 : {
                               1596                 :     int         keyno;
                               1597                 :     BrinMemTuple *db;
                               1598                 :     MemoryContext cxt;
                               1599                 :     MemoryContext oldcxt;
                               1600                 : 
                               1601                 :     /* Use our own memory context to avoid retail pfree */
                               1602               0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
                               1603                 :                                 "brin union",
                               1604                 :                                 ALLOCSET_DEFAULT_SIZES);
                               1605               0 :     oldcxt = MemoryContextSwitchTo(cxt);
 2193                          1606               0 :     db = brin_deform_tuple(bdesc, b, NULL);
 3075                          1607               0 :     MemoryContextSwitchTo(oldcxt);
                               1608                 : 
                               1609               0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
                               1610                 :     {
                               1611                 :         FmgrInfo   *unionFn;
                               1612               0 :         BrinValues *col_a = &a->bt_columns[keyno];
                               1613               0 :         BrinValues *col_b = &db->bt_columns[keyno];
  747 tomas.vondra             1614               0 :         BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
                               1615                 : 
                               1616               0 :         if (opcinfo->oi_regular_nulls)
                               1617                 :         {
                               1618                 :             /* Adjust "hasnulls". */
                               1619               0 :             if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
                               1620               0 :                 col_a->bv_hasnulls = true;
                               1621                 : 
                               1622                 :             /* If there are no values in B, there's nothing left to do. */
                               1623               0 :             if (col_b->bv_allnulls)
                               1624               0 :                 continue;
                               1625                 : 
                               1626                 :             /*
                               1627                 :              * Adjust "allnulls".  If A doesn't have values, just copy the
                               1628                 :              * values from B into A, and we're done.  We cannot run the
                               1629                 :              * operators in this case, because values in A might contain
                               1630                 :              * garbage.  Note we already established that B contains values.
                               1631                 :              */
                               1632               0 :             if (col_a->bv_allnulls)
                               1633               0 :             {
                               1634                 :                 int         i;
                               1635                 : 
                               1636               0 :                 col_a->bv_allnulls = false;
                               1637                 : 
                               1638               0 :                 for (i = 0; i < opcinfo->oi_nstored; i++)
                               1639               0 :                     col_a->bv_values[i] =
                               1640               0 :                         datumCopy(col_b->bv_values[i],
                               1641               0 :                                   opcinfo->oi_typcache[i]->typbyval,
                               1642               0 :                                   opcinfo->oi_typcache[i]->typlen);
                               1643                 : 
                               1644               0 :                 continue;
                               1645                 :             }
                               1646                 :         }
                               1647                 : 
 3075 alvherre                 1648               0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
                               1649                 :                                     BRIN_PROCNUM_UNION);
                               1650               0 :         FunctionCall3Coll(unionFn,
                               1651               0 :                           bdesc->bd_index->rd_indcollation[keyno],
                               1652                 :                           PointerGetDatum(bdesc),
                               1653                 :                           PointerGetDatum(col_a),
                               1654                 :                           PointerGetDatum(col_b));
                               1655                 :     }
                               1656                 : 
                               1657               0 :     MemoryContextDelete(cxt);
                               1658               0 : }
                               1659                 : 
                               1660                 : /*
                               1661                 :  * brin_vacuum_scan
                               1662                 :  *      Do a complete scan of the index during VACUUM.
                               1663                 :  *
                               1664                 :  * This routine scans the complete index looking for uncatalogued index pages,
                               1665                 :  * i.e. those that might have been lost due to a crash after index extension
                               1666                 :  * and such.
                               1667                 :  */
                               1668                 : static void
 2797 alvherre                 1669 CBC          42 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
                               1670                 : {
                               1671                 :     BlockNumber nblocks;
                               1672                 :     BlockNumber blkno;
                               1673                 : 
                               1674                 :     /*
                               1675                 :      * Scan the index in physical order, and clean up any possible mess in
                               1676                 :      * each page.
                               1677                 :      */
 1831 tgl                      1678              42 :     nblocks = RelationGetNumberOfBlocks(idxrel);
                               1679             225 :     for (blkno = 0; blkno < nblocks; blkno++)
                               1680                 :     {
                               1681                 :         Buffer      buf;
                               1682                 : 
 2797 alvherre                 1683             183 :         CHECK_FOR_INTERRUPTS();
                               1684                 : 
                               1685             183 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
                               1686                 :                                  RBM_NORMAL, strategy);
                               1687                 : 
 1831 tgl                      1688             183 :         brin_page_cleanup(idxrel, buf);
                               1689                 : 
 2797 alvherre                 1690             183 :         ReleaseBuffer(buf);
                               1691                 :     }
                               1692                 : 
                               1693                 :     /*
                               1694                 :      * Update all upper pages in the index's FSM, as well.  This ensures not
                               1695                 :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
                               1696                 :      * but also that any pre-existing damage or out-of-dateness is repaired.
                               1697                 :      */
 1831 tgl                      1698              42 :     FreeSpaceMapVacuum(idxrel);
 2797 alvherre                 1699              42 : }
                               1700                 : 
                               1701                 : static bool
  747 tomas.vondra             1702          354407 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
                               1703                 :                     Datum *values, bool *nulls)
                               1704                 : {
                               1705                 :     int         keyno;
                               1706          354407 :     bool        modified = false;
                               1707                 : 
                               1708                 :     /*
                               1709                 :      * Compare the key values of the new tuple to the stored index values; our
                               1710                 :      * deformed tuple will get updated if the new tuple doesn't fit the
                               1711                 :      * original range (note this means we can't break out of the loop early).
                               1712                 :      * Make a note of whether this happens, so that we know to insert the
                               1713                 :      * modified tuple later.
                               1714                 :      */
                               1715          782761 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
                               1716                 :     {
                               1717                 :         Datum       result;
                               1718                 :         BrinValues *bval;
                               1719                 :         FmgrInfo   *addValue;
                               1720                 : 
                               1721          428354 :         bval = &dtup->bt_columns[keyno];
                               1722                 : 
                               1723          428354 :         if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
                               1724                 :         {
                               1725                 :             /*
                               1726                 :              * If the new value is null, we record that we saw it if it's the
                               1727                 :              * first one; otherwise, there's nothing to do.
                               1728                 :              */
                               1729            6972 :             if (!bval->bv_hasnulls)
                               1730                 :             {
                               1731            1680 :                 bval->bv_hasnulls = true;
                               1732            1680 :                 modified = true;
                               1733                 :             }
                               1734                 : 
                               1735            6972 :             continue;
                               1736                 :         }
                               1737                 : 
                               1738          421382 :         addValue = index_getprocinfo(idxRel, keyno + 1,
                               1739                 :                                      BRIN_PROCNUM_ADDVALUE);
                               1740          421382 :         result = FunctionCall4Coll(addValue,
                               1741          421382 :                                    idxRel->rd_indcollation[keyno],
                               1742                 :                                    PointerGetDatum(bdesc),
                               1743                 :                                    PointerGetDatum(bval),
                               1744          421382 :                                    values[keyno],
                               1745          421382 :                                    nulls[keyno]);
                               1746                 :         /* if that returned true, we need to insert the updated tuple */
                               1747          421382 :         modified |= DatumGetBool(result);
                               1748                 :     }
                               1749                 : 
                               1750          354407 :     return modified;
                               1751                 : }
                               1752                 : 
                               1753                 : static bool
                               1754           93927 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
                               1755                 : {
                               1756                 :     int         keyno;
                               1757                 : 
                               1758                 :     /*
                               1759                 :      * First check if there are any IS [NOT] NULL scan keys, and if we're
                               1760                 :      * violating them.
                               1761                 :      */
                               1762           94545 :     for (keyno = 0; keyno < nnullkeys; keyno++)
                               1763                 :     {
                               1764            1116 :         ScanKey     key = nullkeys[keyno];
                               1765                 : 
                               1766            1116 :         Assert(key->sk_attno == bval->bv_attno);
                               1767                 : 
                               1768                 :         /* Handle only IS NULL/IS NOT NULL tests */
                               1769            1116 :         if (!(key->sk_flags & SK_ISNULL))
  747 tomas.vondra             1770 UBC           0 :             continue;
                               1771                 : 
  747 tomas.vondra             1772 CBC        1116 :         if (key->sk_flags & SK_SEARCHNULL)
                               1773                 :         {
                               1774                 :             /* IS NULL scan key, but range has no NULLs */
                               1775             558 :             if (!bval->bv_allnulls && !bval->bv_hasnulls)
                               1776             489 :                 return false;
                               1777                 :         }
                               1778             558 :         else if (key->sk_flags & SK_SEARCHNOTNULL)
                               1779                 :         {
                               1780                 :             /*
                               1781                 :              * For IS NOT NULL, we can only skip ranges that are known to have
                               1782                 :              * only nulls.
                               1783                 :              */
                               1784             558 :             if (bval->bv_allnulls)
                               1785               9 :                 return false;
                               1786                 :         }
                               1787                 :         else
                               1788                 :         {
                               1789                 :             /*
                               1790                 :              * Neither IS NULL nor IS NOT NULL was used; assume all indexable
                               1791                 :              * operators are strict and thus return false with NULL value in
                               1792                 :              * the scan key.
                               1793                 :              */
  747 tomas.vondra             1794 UBC           0 :             return false;
                               1795                 :         }
                               1796                 :     }
                               1797                 : 
  747 tomas.vondra             1798 CBC       93429 :     return true;
                               1799                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a