LCOV - differential code coverage report
Current view: top level - src/backend/commands - matview.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 91.3 % 264 241 13 8 2 5 130 3 103 16 122 3
Current Date: 2023-04-08 15:15:32 Functions: 93.3 % 15 14 1 12 1 1 1 12
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * matview.c
       4                 :  *    materialized view support
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/commands/matview.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "access/genam.h"
      18                 : #include "access/heapam.h"
      19                 : #include "access/htup_details.h"
      20                 : #include "access/multixact.h"
      21                 : #include "access/tableam.h"
      22                 : #include "access/xact.h"
      23                 : #include "access/xlog.h"
      24                 : #include "catalog/catalog.h"
      25                 : #include "catalog/indexing.h"
      26                 : #include "catalog/namespace.h"
      27                 : #include "catalog/pg_am.h"
      28                 : #include "catalog/pg_opclass.h"
      29                 : #include "catalog/pg_operator.h"
      30                 : #include "commands/cluster.h"
      31                 : #include "commands/matview.h"
      32                 : #include "commands/tablecmds.h"
      33                 : #include "commands/tablespace.h"
      34                 : #include "executor/executor.h"
      35                 : #include "executor/spi.h"
      36                 : #include "miscadmin.h"
      37                 : #include "parser/parse_relation.h"
      38                 : #include "pgstat.h"
      39                 : #include "rewrite/rewriteHandler.h"
      40                 : #include "storage/lmgr.h"
      41                 : #include "storage/smgr.h"
      42                 : #include "tcop/tcopprot.h"
      43                 : #include "utils/builtins.h"
      44                 : #include "utils/lsyscache.h"
      45                 : #include "utils/rel.h"
      46                 : #include "utils/snapmgr.h"
      47                 : #include "utils/syscache.h"
      48                 : 
      49                 : 
      50                 : typedef struct
      51                 : {
      52                 :     DestReceiver pub;           /* publicly-known function pointers */
      53                 :     Oid         transientoid;   /* OID of new heap into which to store */
      54                 :     /* These fields are filled by transientrel_startup: */
      55                 :     Relation    transientrel;   /* relation to write to */
      56                 :     CommandId   output_cid;     /* cmin to insert in output tuples */
      57                 :     int         ti_options;     /* table_tuple_insert performance options */
      58                 :     BulkInsertState bistate;    /* bulk insert state */
      59                 : } DR_transientrel;
      60                 : 
      61                 : static int  matview_maintenance_depth = 0;
      62                 : 
      63                 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
      64                 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
      65                 : static void transientrel_shutdown(DestReceiver *self);
      66                 : static void transientrel_destroy(DestReceiver *self);
      67                 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
      68                 :                                        const char *queryString);
      69                 : static char *make_temptable_name_n(char *tempname, int n);
      70                 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
      71                 :                                    int save_sec_context);
      72                 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
      73                 : static bool is_usable_unique_index(Relation indexRel);
      74                 : static void OpenMatViewIncrementalMaintenance(void);
      75                 : static void CloseMatViewIncrementalMaintenance(void);
      76                 : 
      77                 : /*
      78                 :  * SetMatViewPopulatedState
      79                 :  *      Mark a materialized view as populated, or not.
      80                 :  *
      81                 :  * NOTE: caller must be holding an appropriate lock on the relation.
      82                 :  */
      83                 : void
      84 CBC         285 : SetMatViewPopulatedState(Relation relation, bool newstate)
      85                 : {
      86                 :     Relation    pgrel;
      87                 :     HeapTuple   tuple;
      88                 : 
      89             285 :     Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
      90                 : 
      91                 :     /*
      92                 :      * Update relation's pg_class entry.  Crucial side-effect: other backends
      93                 :      * (and this one too!) are sent SI message to make them rebuild relcache
      94                 :      * entries.
      95                 :      */
      96             285 :     pgrel = table_open(RelationRelationId, RowExclusiveLock);
      97             285 :     tuple = SearchSysCacheCopy1(RELOID,
      98                 :                                 ObjectIdGetDatum(RelationGetRelid(relation)));
      99             285 :     if (!HeapTupleIsValid(tuple))
     100 UBC           0 :         elog(ERROR, "cache lookup failed for relation %u",
     101                 :              RelationGetRelid(relation));
     102                 : 
     103 CBC         285 :     ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
     104                 : 
     105             285 :     CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
     106                 : 
     107             285 :     heap_freetuple(tuple);
     108             285 :     table_close(pgrel, RowExclusiveLock);
     109                 : 
     110                 :     /*
     111                 :      * Advance command counter to make the updated pg_class row locally
     112                 :      * visible.
     113                 :      */
     114             285 :     CommandCounterIncrement();
     115             285 : }
     116                 : 
     117                 : /*
     118                 :  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
     119                 :  *
     120                 :  * This refreshes the materialized view by creating a new table and swapping
     121                 :  * the relfilenumbers of the new table and the old materialized view, so the OID
     122                 :  * of the original materialized view is preserved. Thus we do not lose GRANT
     123                 :  * nor references to this materialized view.
     124                 :  *
     125                 :  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
     126                 :  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
     127                 :  * statement associated with the materialized view.  The statement node's
     128                 :  * skipData field shows whether the clause was used.
     129                 :  *
     130                 :  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
     131                 :  * the new heap, it's better to create the indexes afterwards than to fill them
     132                 :  * incrementally while we load.
     133                 :  *
     134                 :  * The matview's "populated" state is changed based on whether the contents
     135                 :  * reflect the result set of the materialized view's query.
     136                 :  */
     137                 : ObjectAddress
     138             123 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
     139                 :                    ParamListInfo params, QueryCompletion *qc)
     140                 : {
     141                 :     Oid         matviewOid;
     142                 :     Relation    matviewRel;
     143                 :     RewriteRule *rule;
     144                 :     List       *actions;
     145                 :     Query      *dataQuery;
     146                 :     Oid         tableSpace;
     147                 :     Oid         relowner;
     148                 :     Oid         OIDNewHeap;
     149                 :     DestReceiver *dest;
     150             123 :     uint64      processed = 0;
     151                 :     bool        concurrent;
     152                 :     LOCKMODE    lockmode;
     153                 :     char        relpersistence;
     154                 :     Oid         save_userid;
     155                 :     int         save_sec_context;
     156                 :     int         save_nestlevel;
     157                 :     ObjectAddress address;
     158                 : 
     159                 :     /* Determine strength of lock needed. */
     160             123 :     concurrent = stmt->concurrent;
     161             123 :     lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
     162                 : 
     163                 :     /*
     164                 :      * Get a lock until end of transaction.
     165                 :      */
     166             123 :     matviewOid = RangeVarGetRelidExtended(stmt->relation,
     167                 :                                           lockmode, 0,
     168                 :                                           RangeVarCallbackMaintainsTable,
     169                 :                                           NULL);
     170 GIC         120 :     matviewRel = table_open(matviewOid, NoLock);
     171 CBC         120 :     relowner = matviewRel->rd_rel->relowner;
     172 ECB             : 
     173                 :     /*
     174                 :      * Switch to the owner's userid, so that any functions are run as that
     175                 :      * user.  Also lock down security-restricted operations and arrange to
     176                 :      * make GUC variable changes local to this command.
     177                 :      */
     178 GIC         120 :     GetUserIdAndSecContext(&save_userid, &save_sec_context);
     179 CBC         120 :     SetUserIdAndSecContext(relowner,
     180 ECB             :                            save_sec_context | SECURITY_RESTRICTED_OPERATION);
     181 GIC         120 :     save_nestlevel = NewGUCNestLevel();
     182 ECB             : 
     183                 :     /* Make sure it is a materialized view. */
     184 GIC         120 :     if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
     185 LBC           0 :         ereport(ERROR,
     186 EUB             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     187                 :                  errmsg("\"%s\" is not a materialized view",
     188                 :                         RelationGetRelationName(matviewRel))));
     189                 : 
     190                 :     /* Check that CONCURRENTLY is not specified if not populated. */
     191 GIC         120 :     if (concurrent && !RelationIsPopulated(matviewRel))
     192 LBC           0 :         ereport(ERROR,
     193 EUB             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     194                 :                  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
     195                 : 
     196                 :     /* Check that conflicting options have not been specified. */
     197 GIC         120 :     if (concurrent && stmt->skipData)
     198 CBC           3 :         ereport(ERROR,
     199 ECB             :                 (errcode(ERRCODE_SYNTAX_ERROR),
     200                 :                  errmsg("%s and %s options cannot be used together",
     201                 :                         "CONCURRENTLY", "WITH NO DATA")));
     202                 : 
     203                 :     /*
     204                 :      * Check that everything is correct for a refresh. Problems at this point
     205                 :      * are internal errors, so elog is sufficient.
     206                 :      */
     207 GIC         117 :     if (matviewRel->rd_rel->relhasrules == false ||
     208 CBC         117 :         matviewRel->rd_rules->numLocks < 1)
     209 LBC           0 :         elog(ERROR,
     210 EUB             :              "materialized view \"%s\" is missing rewrite information",
     211                 :              RelationGetRelationName(matviewRel));
     212                 : 
     213 GIC         117 :     if (matviewRel->rd_rules->numLocks > 1)
     214 LBC           0 :         elog(ERROR,
     215 EUB             :              "materialized view \"%s\" has too many rules",
     216                 :              RelationGetRelationName(matviewRel));
     217                 : 
     218 GIC         117 :     rule = matviewRel->rd_rules->rules[0];
     219 CBC         117 :     if (rule->event != CMD_SELECT || !(rule->isInstead))
     220 LBC           0 :         elog(ERROR,
     221 EUB             :              "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
     222                 :              RelationGetRelationName(matviewRel));
     223                 : 
     224 GIC         117 :     actions = rule->actions;
     225 CBC         117 :     if (list_length(actions) != 1)
     226 LBC           0 :         elog(ERROR,
     227 EUB             :              "the rule for materialized view \"%s\" is not a single action",
     228                 :              RelationGetRelationName(matviewRel));
     229                 : 
     230                 :     /*
     231                 :      * Check that there is a unique index with no WHERE clause on one or more
     232                 :      * columns of the materialized view if CONCURRENTLY is specified.
     233                 :      */
     234 GIC         117 :     if (concurrent)
     235 ECB             :     {
     236 GIC          36 :         List       *indexoidlist = RelationGetIndexList(matviewRel);
     237 ECB             :         ListCell   *indexoidscan;
     238 GIC          36 :         bool        hasUniqueIndex = false;
     239 ECB             : 
     240 GIC          42 :         foreach(indexoidscan, indexoidlist)
     241 ECB             :         {
     242 GIC          39 :             Oid         indexoid = lfirst_oid(indexoidscan);
     243 ECB             :             Relation    indexRel;
     244                 : 
     245 GIC          39 :             indexRel = index_open(indexoid, AccessShareLock);
     246 CBC          39 :             hasUniqueIndex = is_usable_unique_index(indexRel);
     247              39 :             index_close(indexRel, AccessShareLock);
     248              39 :             if (hasUniqueIndex)
     249              33 :                 break;
     250 ECB             :         }
     251                 : 
     252 GIC          36 :         list_free(indexoidlist);
     253 ECB             : 
     254 GIC          36 :         if (!hasUniqueIndex)
     255 CBC           3 :             ereport(ERROR,
     256 ECB             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     257                 :                      errmsg("cannot refresh materialized view \"%s\" concurrently",
     258                 :                             quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
     259                 :                                                        RelationGetRelationName(matviewRel))),
     260                 :                      errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
     261                 :     }
     262                 : 
     263                 :     /*
     264                 :      * The stored query was rewritten at the time of the MV definition, but
     265                 :      * has not been scribbled on by the planner.
     266                 :      */
     267 GIC         114 :     dataQuery = linitial_node(Query, actions);
     268 ECB             : 
     269                 :     /*
     270                 :      * Check for active uses of the relation in the current transaction, such
     271                 :      * as open scans.
     272                 :      *
     273                 :      * NB: We count on this to protect us against problems with refreshing the
     274                 :      * data using TABLE_INSERT_FROZEN.
     275                 :      */
     276 GIC         114 :     CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
     277 ECB             : 
     278                 :     /*
     279                 :      * Tentatively mark the matview as populated or not (this will roll back
     280                 :      * if we fail later).
     281                 :      */
     282 GIC         114 :     SetMatViewPopulatedState(matviewRel, !stmt->skipData);
     283 ECB             : 
     284                 :     /* Concurrent refresh builds new data in temp tablespace, and does diff. */
     285 GIC         114 :     if (concurrent)
     286 ECB             :     {
     287 GIC          33 :         tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
     288 CBC          33 :         relpersistence = RELPERSISTENCE_TEMP;
     289 ECB             :     }
     290                 :     else
     291                 :     {
     292 GIC          81 :         tableSpace = matviewRel->rd_rel->reltablespace;
     293 CBC          81 :         relpersistence = matviewRel->rd_rel->relpersistence;
     294 ECB             :     }
     295                 : 
     296                 :     /*
     297                 :      * Create the transient table that will receive the regenerated data. Lock
     298                 :      * it against access by any other process until commit (by which time it
     299                 :      * will be gone).
     300                 :      */
     301 GIC         228 :     OIDNewHeap = make_new_heap(matviewOid, tableSpace,
     302 CBC         114 :                                matviewRel->rd_rel->relam,
     303 ECB             :                                relpersistence, ExclusiveLock);
     304 GIC         114 :     LockRelationOid(OIDNewHeap, AccessExclusiveLock);
     305 CBC         114 :     dest = CreateTransientRelDestReceiver(OIDNewHeap);
     306 ECB             : 
     307                 :     /* Generate the data, if wanted. */
     308 GIC         114 :     if (!stmt->skipData)
     309 CBC         114 :         processed = refresh_matview_datafill(dest, dataQuery, queryString);
     310 ECB             : 
     311                 :     /* Make the matview match the newly generated data. */
     312 GIC          96 :     if (concurrent)
     313 ECB             :     {
     314 GIC          33 :         int         old_depth = matview_maintenance_depth;
     315 ECB             : 
     316 GIC          33 :         PG_TRY();
     317 ECB             :         {
     318 GIC          33 :             refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
     319 ECB             :                                    save_sec_context);
     320                 :         }
     321 GIC           3 :         PG_CATCH();
     322 ECB             :         {
     323 GIC           3 :             matview_maintenance_depth = old_depth;
     324 CBC           3 :             PG_RE_THROW();
     325 ECB             :         }
     326 GIC          30 :         PG_END_TRY();
     327 CBC          30 :         Assert(matview_maintenance_depth == old_depth);
     328 ECB             :     }
     329                 :     else
     330                 :     {
     331 GIC          63 :         refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
     332 ECB             : 
     333                 :         /*
     334                 :          * Inform cumulative stats system about our activity: basically, we
     335                 :          * truncated the matview and inserted some new data.  (The concurrent
     336                 :          * code path above doesn't need to worry about this because the
     337                 :          * inserts and deletes it issues get counted by lower-level code.)
     338                 :          */
     339 GIC          60 :         pgstat_count_truncate(matviewRel);
     340 CBC          60 :         if (!stmt->skipData)
     341              60 :             pgstat_count_heap_insert(matviewRel, processed);
     342 ECB             :     }
     343                 : 
     344 GIC          90 :     table_close(matviewRel, NoLock);
     345 ECB             : 
     346                 :     /* Roll back any GUC changes */
     347 GIC          90 :     AtEOXact_GUC(false, save_nestlevel);
     348 ECB             : 
     349                 :     /* Restore userid and security context */
     350 GIC          90 :     SetUserIdAndSecContext(save_userid, save_sec_context);
     351 ECB             : 
     352 GIC          90 :     ObjectAddressSet(address, RelationRelationId, matviewOid);
     353 ECB             : 
     354                 :     /*
     355                 :      * Save the rowcount so that pg_stat_statements can track the total number
     356                 :      * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
     357                 :      * still don't display the rowcount in the command completion tag output,
     358                 :      * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
     359                 :      * command tag is left false in cmdtaglist.h. Otherwise, the change of
     360                 :      * completion tag output might break applications using it.
     361                 :      */
     362 GIC          90 :     if (qc)
     363 CBC          90 :         SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
     364 ECB             : 
     365 GIC          90 :     return address;
     366 ECB             : }
     367                 : 
     368                 : /*
     369                 :  * refresh_matview_datafill
     370                 :  *
     371                 :  * Execute the given query, sending result rows to "dest" (which will
     372                 :  * insert them into the target matview).
     373                 :  *
     374                 :  * Returns number of rows inserted.
     375                 :  */
     376                 : static uint64
     377 GIC         114 : refresh_matview_datafill(DestReceiver *dest, Query *query,
     378 ECB             :                          const char *queryString)
     379                 : {
     380                 :     List       *rewritten;
     381                 :     PlannedStmt *plan;
     382                 :     QueryDesc  *queryDesc;
     383                 :     Query      *copied_query;
     384                 :     uint64      processed;
     385                 : 
     386                 :     /* Lock and rewrite, using a copy to preserve the original query. */
     387 GIC         114 :     copied_query = copyObject(query);
     388 CBC         114 :     AcquireRewriteLocks(copied_query, true, false);
     389             114 :     rewritten = QueryRewrite(copied_query);
     390 ECB             : 
     391                 :     /* SELECT should never rewrite to more or less than one SELECT query */
     392 GIC         114 :     if (list_length(rewritten) != 1)
     393 LBC           0 :         elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
     394 GBC         114 :     query = (Query *) linitial(rewritten);
     395 ECB             : 
     396                 :     /* Check for user-requested abort. */
     397 GIC         114 :     CHECK_FOR_INTERRUPTS();
     398 ECB             : 
     399                 :     /* Plan the query which will generate data for the refresh. */
     400 GIC         114 :     plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
     401 ECB             : 
     402                 :     /*
     403                 :      * Use a snapshot with an updated command ID to ensure this query sees
     404                 :      * results of any previously executed queries.  (This could only matter if
     405                 :      * the planner executed an allegedly-stable function that changed the
     406                 :      * database contents, but let's do it anyway to be safe.)
     407                 :      */
     408 GIC         111 :     PushCopiedSnapshot(GetActiveSnapshot());
     409 CBC         111 :     UpdateActiveSnapshotCommandId();
     410 ECB             : 
     411                 :     /* Create a QueryDesc, redirecting output to our tuple receiver */
     412 GIC         111 :     queryDesc = CreateQueryDesc(plan, queryString,
     413 ECB             :                                 GetActiveSnapshot(), InvalidSnapshot,
     414                 :                                 dest, NULL, NULL, 0);
     415                 : 
     416                 :     /* call ExecutorStart to prepare the plan for execution */
     417 GIC         111 :     ExecutorStart(queryDesc, 0);
     418 ECB             : 
     419                 :     /* run the plan */
     420 GNC         111 :     ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
     421 ECB             : 
     422 GIC          96 :     processed = queryDesc->estate->es_processed;
     423 ECB             : 
     424                 :     /* and clean up */
     425 GIC          96 :     ExecutorFinish(queryDesc);
     426 CBC          96 :     ExecutorEnd(queryDesc);
     427 ECB             : 
     428 GIC          96 :     FreeQueryDesc(queryDesc);
     429 ECB             : 
     430 GIC          96 :     PopActiveSnapshot();
     431 ECB             : 
     432 GIC          96 :     return processed;
     433 ECB             : }
     434                 : 
     435                 : DestReceiver *
     436 GIC         114 : CreateTransientRelDestReceiver(Oid transientoid)
     437 ECB             : {
     438 GIC         114 :     DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
     439 ECB             : 
     440 GIC         114 :     self->pub.receiveSlot = transientrel_receive;
     441 CBC         114 :     self->pub.rStartup = transientrel_startup;
     442             114 :     self->pub.rShutdown = transientrel_shutdown;
     443             114 :     self->pub.rDestroy = transientrel_destroy;
     444             114 :     self->pub.mydest = DestTransientRel;
     445             114 :     self->transientoid = transientoid;
     446 ECB             : 
     447 GIC         114 :     return (DestReceiver *) self;
     448 ECB             : }
     449                 : 
     450                 : /*
     451                 :  * transientrel_startup --- executor startup
     452                 :  */
     453                 : static void
     454 GIC         111 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
     455 ECB             : {
     456 GIC         111 :     DR_transientrel *myState = (DR_transientrel *) self;
     457 ECB             :     Relation    transientrel;
     458                 : 
     459 GIC         111 :     transientrel = table_open(myState->transientoid, NoLock);
     460 ECB             : 
     461                 :     /*
     462                 :      * Fill private fields of myState for use by later routines
     463                 :      */
     464 GIC         111 :     myState->transientrel = transientrel;
     465 CBC         111 :     myState->output_cid = GetCurrentCommandId(true);
     466             111 :     myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
     467             111 :     myState->bistate = GetBulkInsertState();
     468 ECB             : 
     469                 :     /*
     470                 :      * Valid smgr_targblock implies something already wrote to the relation.
     471                 :      * This may be harmless, but this function hasn't planned for it.
     472                 :      */
     473 GIC         111 :     Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
     474 CBC         111 : }
     475 ECB             : 
     476                 : /*
     477                 :  * transientrel_receive --- receive one tuple
     478                 :  */
     479                 : static bool
     480 GIC         284 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
     481 ECB             : {
     482 GIC         284 :     DR_transientrel *myState = (DR_transientrel *) self;
     483 ECB             : 
     484                 :     /*
     485                 :      * Note that the input slot might not be of the type of the target
     486                 :      * relation. That's supported by table_tuple_insert(), but slightly less
     487                 :      * efficient than inserting with the right slot - but the alternative
     488                 :      * would be to copy into a slot of the right type, which would not be
     489                 :      * cheap either. This also doesn't allow accessing per-AM data (say a
     490                 :      * tuple's xmin), but since we don't do that here...
     491                 :      */
     492                 : 
     493 GIC         284 :     table_tuple_insert(myState->transientrel,
     494 ECB             :                        slot,
     495                 :                        myState->output_cid,
     496                 :                        myState->ti_options,
     497                 :                        myState->bistate);
     498                 : 
     499                 :     /* We know this is a newly created relation, so there are no indexes */
     500                 : 
     501 GIC         284 :     return true;
     502 ECB             : }
     503                 : 
     504                 : /*
     505                 :  * transientrel_shutdown --- executor end
     506                 :  */
     507                 : static void
     508 GIC          96 : transientrel_shutdown(DestReceiver *self)
     509 ECB             : {
     510 GIC          96 :     DR_transientrel *myState = (DR_transientrel *) self;
     511 ECB             : 
     512 GIC          96 :     FreeBulkInsertState(myState->bistate);
     513 ECB             : 
     514 GIC          96 :     table_finish_bulk_insert(myState->transientrel, myState->ti_options);
     515 ECB             : 
     516                 :     /* close transientrel, but keep lock until commit */
     517 GIC          96 :     table_close(myState->transientrel, NoLock);
     518 CBC          96 :     myState->transientrel = NULL;
     519              96 : }
     520 ECB             : 
     521                 : /*
     522                 :  * transientrel_destroy --- release DestReceiver object
     523                 :  */
     524                 : static void
     525 UIC           0 : transientrel_destroy(DestReceiver *self)
     526 EUB             : {
     527 UIC           0 :     pfree(self);
     528 UBC           0 : }
     529 EUB             : 
     530                 : 
     531                 : /*
     532                 :  * Given a qualified temporary table name, append an underscore followed by
     533                 :  * the given integer, to make a new table name based on the old one.
     534                 :  * The result is a palloc'd string.
     535                 :  *
     536                 :  * As coded, this would fail to make a valid SQL name if the given name were,
     537                 :  * say, "FOO"."BAR".  Currently, the table name portion of the input will
     538                 :  * never be double-quoted because it's of the form "pg_temp_NNN", cf
     539                 :  * make_new_heap().  But we might have to work harder someday.
     540                 :  */
     541                 : static char *
     542 GIC          33 : make_temptable_name_n(char *tempname, int n)
     543 ECB             : {
     544                 :     StringInfoData namebuf;
     545                 : 
     546 GIC          33 :     initStringInfo(&namebuf);
     547 CBC          33 :     appendStringInfoString(&namebuf, tempname);
     548              33 :     appendStringInfo(&namebuf, "_%d", n);
     549              33 :     return namebuf.data;
     550 ECB             : }
     551                 : 
     552                 : /*
     553                 :  * refresh_by_match_merge
     554                 :  *
     555                 :  * Refresh a materialized view with transactional semantics, while allowing
     556                 :  * concurrent reads.
     557                 :  *
     558                 :  * This is called after a new version of the data has been created in a
     559                 :  * temporary table.  It performs a full outer join against the old version of
     560                 :  * the data, producing "diff" results.  This join cannot work if there are any
     561                 :  * duplicated rows in either the old or new versions, in the sense that every
     562                 :  * column would compare as equal between the two rows.  It does work correctly
     563                 :  * in the face of rows which have at least one NULL value, with all non-NULL
     564                 :  * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
     565                 :  * indexes turns out to be quite convenient here; the tests we need to make
     566                 :  * are consistent with default behavior.  If there is at least one UNIQUE
     567                 :  * index on the materialized view, we have exactly the guarantee we need.
     568                 :  *
     569                 :  * The temporary table used to hold the diff results contains just the TID of
     570                 :  * the old record (if matched) and the ROW from the new table as a single
     571                 :  * column of complex record type (if matched).
     572                 :  *
     573                 :  * Once we have the diff table, we perform set-based DELETE and INSERT
     574                 :  * operations against the materialized view, and discard both temporary
     575                 :  * tables.
     576                 :  *
     577                 :  * Everything from the generation of the new data to applying the differences
     578                 :  * takes place under cover of an ExclusiveLock, since it seems as though we
     579                 :  * would want to prohibit not only concurrent REFRESH operations, but also
     580                 :  * incremental maintenance.  It also doesn't seem reasonable or safe to allow
     581                 :  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
     582                 :  * this command.
     583                 :  */
     584                 : static void
     585 GIC          33 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
     586 ECB             :                        int save_sec_context)
     587                 : {
     588                 :     StringInfoData querybuf;
     589                 :     Relation    matviewRel;
     590                 :     Relation    tempRel;
     591                 :     char       *matviewname;
     592                 :     char       *tempname;
     593                 :     char       *diffname;
     594                 :     TupleDesc   tupdesc;
     595                 :     bool        foundUniqueIndex;
     596                 :     List       *indexoidlist;
     597                 :     ListCell   *indexoidscan;
     598                 :     int16       relnatts;
     599                 :     Oid        *opUsedForQual;
     600                 : 
     601 GIC          33 :     initStringInfo(&querybuf);
     602 CBC          33 :     matviewRel = table_open(matviewOid, NoLock);
     603              33 :     matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
     604              33 :                                              RelationGetRelationName(matviewRel));
     605              33 :     tempRel = table_open(tempOid, NoLock);
     606              33 :     tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
     607              33 :                                           RelationGetRelationName(tempRel));
     608              33 :     diffname = make_temptable_name_n(tempname, 2);
     609 ECB             : 
     610 GIC          33 :     relnatts = RelationGetNumberOfAttributes(matviewRel);
     611 ECB             : 
     612                 :     /* Open SPI context. */
     613 GIC          33 :     if (SPI_connect() != SPI_OK_CONNECT)
     614 LBC           0 :         elog(ERROR, "SPI_connect failed");
     615 EUB             : 
     616                 :     /* Analyze the temp table with the new contents. */
     617 GIC          33 :     appendStringInfo(&querybuf, "ANALYZE %s", tempname);
     618 CBC          33 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     619 LBC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     620 EUB             : 
     621                 :     /*
     622                 :      * We need to ensure that there are not duplicate rows without NULLs in
     623                 :      * the new data set before we can count on the "diff" results.  Check for
     624                 :      * that in a way that allows showing the first duplicated row found.  Even
     625                 :      * after we pass this test, a unique index on the materialized view may
     626                 :      * find a duplicate key problem.
     627                 :      *
     628                 :      * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
     629                 :      * keep ".*" from being expanded into multiple columns in a SELECT list.
     630                 :      * Compare ruleutils.c's get_variable().
     631                 :      */
     632 GIC          33 :     resetStringInfo(&querybuf);
     633 CBC          33 :     appendStringInfo(&querybuf,
     634 ECB             :                      "SELECT newdata.*::%s FROM %s newdata "
     635                 :                      "WHERE newdata.* IS NOT NULL AND EXISTS "
     636                 :                      "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
     637                 :                      "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
     638                 :                      "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
     639                 :                      "newdata.ctid)",
     640                 :                      tempname, tempname, tempname);
     641 GIC          33 :     if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
     642 LBC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     643 GBC          33 :     if (SPI_processed > 0)
     644 ECB             :     {
     645                 :         /*
     646                 :          * Note that this ereport() is returning data to the user.  Generally,
     647                 :          * we would want to make sure that the user has been granted access to
     648                 :          * this data.  However, REFRESH MAT VIEW is only able to be run by the
     649                 :          * owner of the mat view (or a superuser) and therefore there is no
     650                 :          * need to check for access to data in the mat view.
     651                 :          */
     652 GIC           3 :         ereport(ERROR,
     653 ECB             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
     654                 :                  errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
     655                 :                         RelationGetRelationName(matviewRel)),
     656                 :                  errdetail("Row: %s",
     657                 :                            SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
     658                 :     }
     659                 : 
     660 GIC          30 :     SetUserIdAndSecContext(relowner,
     661 ECB             :                            save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
     662                 : 
     663                 :     /* Start building the query for creating the diff table. */
     664 GIC          30 :     resetStringInfo(&querybuf);
     665 CBC          30 :     appendStringInfo(&querybuf,
     666 ECB             :                      "CREATE TEMP TABLE %s AS "
     667                 :                      "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
     668                 :                      "FROM %s mv FULL JOIN %s newdata ON (",
     669                 :                      diffname, tempname, matviewname, tempname);
     670                 : 
     671                 :     /*
     672                 :      * Get the list of index OIDs for the table from the relcache, and look up
     673                 :      * each one in the pg_index syscache.  We will test for equality on all
     674                 :      * columns present in all unique indexes which only reference columns and
     675                 :      * include all rows.
     676                 :      */
     677 GIC          30 :     tupdesc = matviewRel->rd_att;
     678 CBC          30 :     opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
     679              30 :     foundUniqueIndex = false;
     680 ECB             : 
     681 GIC          30 :     indexoidlist = RelationGetIndexList(matviewRel);
     682 ECB             : 
     683 GIC          66 :     foreach(indexoidscan, indexoidlist)
     684 ECB             :     {
     685 GIC          36 :         Oid         indexoid = lfirst_oid(indexoidscan);
     686 ECB             :         Relation    indexRel;
     687                 : 
     688 GIC          36 :         indexRel = index_open(indexoid, RowExclusiveLock);
     689 CBC          36 :         if (is_usable_unique_index(indexRel))
     690 ECB             :         {
     691 GIC          36 :             Form_pg_index indexStruct = indexRel->rd_index;
     692 CBC          36 :             int         indnkeyatts = indexStruct->indnkeyatts;
     693 ECB             :             oidvector  *indclass;
     694                 :             Datum       indclassDatum;
     695                 :             int         i;
     696                 : 
     697                 :             /* Must get indclass the hard way. */
     698 GNC          36 :             indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
     699              36 :                                                    indexRel->rd_indextuple,
     700                 :                                                    Anum_pg_index_indclass);
     701 GIC          36 :             indclass = (oidvector *) DatumGetPointer(indclassDatum);
     702 ECB             : 
     703                 :             /* Add quals for all columns from this index. */
     704 CBC          80 :             for (i = 0; i < indnkeyatts; i++)
     705 ECB             :             {
     706 CBC          44 :                 int         attnum = indexStruct->indkey.values[i];
     707              44 :                 Oid         opclass = indclass->values[i];
     708 GIC          44 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
     709              44 :                 Oid         attrtype = attr->atttypid;
     710                 :                 HeapTuple   cla_ht;
     711                 :                 Form_pg_opclass cla_tup;
     712                 :                 Oid         opfamily;
     713                 :                 Oid         opcintype;
     714                 :                 Oid         op;
     715                 :                 const char *leftop;
     716                 :                 const char *rightop;
     717                 : 
     718                 :                 /*
     719                 :                  * Identify the equality operator associated with this index
     720 ECB             :                  * column.  First we need to look up the column's opclass.
     721                 :                  */
     722 GBC          44 :                 cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
     723 CBC          44 :                 if (!HeapTupleIsValid(cla_ht))
     724 LBC           0 :                     elog(ERROR, "cache lookup failed for opclass %u", opclass);
     725 CBC          44 :                 cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
     726              44 :                 Assert(cla_tup->opcmethod == BTREE_AM_OID);
     727              44 :                 opfamily = cla_tup->opcfamily;
     728 GIC          44 :                 opcintype = cla_tup->opcintype;
     729 CBC          44 :                 ReleaseSysCache(cla_ht);
     730                 : 
     731              44 :                 op = get_opfamily_member(opfamily, opcintype, opcintype,
     732 EUB             :                                          BTEqualStrategyNumber);
     733 GIC          44 :                 if (!OidIsValid(op))
     734 UIC           0 :                     elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     735                 :                          BTEqualStrategyNumber, opcintype, opcintype, opfamily);
     736                 : 
     737                 :                 /*
     738                 :                  * If we find the same column with the same equality semantics
     739                 :                  * in more than one index, we only need to emit the equality
     740                 :                  * clause once.
     741                 :                  *
     742                 :                  * Since we only remember the last equality operator, this
     743                 :                  * code could be fooled into emitting duplicate clauses given
     744                 :                  * multiple indexes with several different opclasses ... but
     745                 :                  * that's so unlikely it doesn't seem worth spending extra
     746 ECB             :                  * code to avoid.
     747 EUB             :                  */
     748 CBC          44 :                 if (opUsedForQual[attnum - 1] == op)
     749 UIC           0 :                     continue;
     750 GIC          44 :                 opUsedForQual[attnum - 1] = op;
     751                 : 
     752                 :                 /*
     753 ECB             :                  * Actually add the qual, ANDed with any others.
     754                 :                  */
     755 GIC          44 :                 if (foundUniqueIndex)
     756 CBC          14 :                     appendStringInfoString(&querybuf, " AND ");
     757 ECB             : 
     758 CBC          44 :                 leftop = quote_qualified_identifier("newdata",
     759              44 :                                                     NameStr(attr->attname));
     760 GIC          44 :                 rightop = quote_qualified_identifier("mv",
     761 CBC          44 :                                                      NameStr(attr->attname));
     762                 : 
     763 GIC          44 :                 generate_operator_clause(&querybuf,
     764                 :                                          leftop, attrtype,
     765                 :                                          op,
     766 ECB             :                                          rightop, attrtype);
     767                 : 
     768 GIC          44 :                 foundUniqueIndex = true;
     769                 :             }
     770                 :         }
     771 ECB             : 
     772                 :         /* Keep the locks, since we're about to run DML which needs them. */
     773 GIC          36 :         index_close(indexRel, NoLock);
     774 ECB             :     }
     775                 : 
     776 GIC          30 :     list_free(indexoidlist);
     777                 : 
     778                 :     /*
     779                 :      * There must be at least one usable unique index on the matview.
     780                 :      *
     781                 :      * ExecRefreshMatView() checks that after taking the exclusive lock on the
     782                 :      * matview. So at least one unique index is guaranteed to exist here
     783 ECB             :      * because the lock is still being held; so an Assert seems sufficient.
     784                 :      */
     785 CBC          30 :     Assert(foundUniqueIndex);
     786                 : 
     787 GIC          30 :     appendStringInfoString(&querybuf,
     788                 :                            " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
     789                 :                            "WHERE newdata.* IS NULL OR mv.* IS NULL "
     790                 :                            "ORDER BY tid");
     791 ECB             : 
     792 EUB             :     /* Create the temporary "diff" table. */
     793 GIC          30 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     794 LBC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     795                 : 
     796 GIC          30 :     SetUserIdAndSecContext(relowner,
     797                 :                            save_sec_context | SECURITY_RESTRICTED_OPERATION);
     798                 : 
     799                 :     /*
     800                 :      * We have no further use for data from the "full-data" temp table, but we
     801                 :      * must keep it around because its type is referenced from the diff table.
     802                 :      */
     803 ECB             : 
     804                 :     /* Analyze the diff table. */
     805 CBC          30 :     resetStringInfo(&querybuf);
     806 GBC          30 :     appendStringInfo(&querybuf, "ANALYZE %s", diffname);
     807 GIC          30 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     808 LBC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     809                 : 
     810 GIC          30 :     OpenMatViewIncrementalMaintenance();
     811 ECB             : 
     812                 :     /* Deletes must come before inserts; do them first. */
     813 GIC          30 :     resetStringInfo(&querybuf);
     814              30 :     appendStringInfo(&querybuf,
     815                 :                      "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
     816                 :                      "(SELECT diff.tid FROM %s diff "
     817                 :                      "WHERE diff.tid IS NOT NULL "
     818 ECB             :                      "AND diff.newdata IS NULL)",
     819 EUB             :                      matviewname, diffname);
     820 GIC          30 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
     821 UIC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     822 ECB             : 
     823                 :     /* Inserts go last. */
     824 GIC          30 :     resetStringInfo(&querybuf);
     825              30 :     appendStringInfo(&querybuf,
     826                 :                      "INSERT INTO %s SELECT (diff.newdata).* "
     827 ECB             :                      "FROM %s diff WHERE tid IS NULL",
     828 EUB             :                      matviewname, diffname);
     829 GIC          30 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
     830 UIC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     831 ECB             : 
     832                 :     /* We're done maintaining the materialized view. */
     833 CBC          30 :     CloseMatViewIncrementalMaintenance();
     834 GIC          30 :     table_close(tempRel, NoLock);
     835              30 :     table_close(matviewRel, NoLock);
     836 ECB             : 
     837                 :     /* Clean up temp tables. */
     838 CBC          30 :     resetStringInfo(&querybuf);
     839 GBC          30 :     appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
     840 GIC          30 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     841 UIC           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     842 ECB             : 
     843 EUB             :     /* Close SPI context. */
     844 CBC          30 :     if (SPI_finish() != SPI_OK_FINISH)
     845 UIC           0 :         elog(ERROR, "SPI_finish failed");
     846 GIC          30 : }
     847                 : 
     848                 : /*
     849                 :  * Swap the physical files of the target and transient tables, then rebuild
     850                 :  * the target's indexes and throw away the transient table.  Security context
     851                 :  * swapping is handled by the called function, so it is not needed here.
     852 ECB             :  */
     853                 : static void
     854 CBC          63 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
     855                 : {
     856              63 :     finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
     857                 :                      RecentXmin, ReadNextMultiXactId(), relpersistence);
     858 GIC          60 : }
     859                 : 
     860                 : /*
     861                 :  * Check whether specified index is usable for match merge.
     862 ECB             :  */
     863                 : static bool
     864 CBC          75 : is_usable_unique_index(Relation indexRel)
     865                 : {
     866 GIC          75 :     Form_pg_index indexStruct = indexRel->rd_index;
     867                 : 
     868                 :     /*
     869                 :      * Must be unique, valid, immediate, non-partial, and be defined over
     870                 :      * plain user columns (not expressions).  We also require it to be a
     871                 :      * btree.  Even if we had any other unique index kinds, we'd not know how
     872                 :      * to identify the corresponding equality operator, nor could we be sure
     873                 :      * that the planner could implement the required FULL JOIN with non-btree
     874 ECB             :      * operators.
     875                 :      */
     876 CBC          75 :     if (indexStruct->indisunique &&
     877              75 :         indexStruct->indimmediate &&
     878              75 :         indexRel->rd_rel->relam == BTREE_AM_OID &&
     879             150 :         indexStruct->indisvalid &&
     880 GIC          75 :         RelationGetIndexPredicate(indexRel) == NIL &&
     881              72 :         indexStruct->indnatts > 0)
     882                 :     {
     883                 :         /*
     884                 :          * The point of groveling through the index columns individually is to
     885                 :          * reject both index expressions and system columns.  Currently,
     886                 :          * matviews couldn't have OID columns so there's no way to create an
     887                 :          * index on a system column; but maybe someday that wouldn't be true,
     888 ECB             :          * so let's be safe.
     889                 :          */
     890 GIC          72 :         int         numatts = indexStruct->indnatts;
     891 ECB             :         int         i;
     892                 : 
     893 CBC         157 :         for (i = 0; i < numatts; i++)
     894                 :         {
     895              88 :             int         attnum = indexStruct->indkey.values[i];
     896 ECB             : 
     897 GIC          88 :             if (attnum <= 0)
     898 CBC           3 :                 return false;
     899                 :         }
     900              69 :         return true;
     901                 :     }
     902 GIC           3 :     return false;
     903                 : }
     904                 : 
     905                 : 
     906                 : /*
     907                 :  * This should be used to test whether the backend is in a context where it is
     908                 :  * OK to allow DML statements to modify materialized views.  We only want to
     909                 :  * allow that for internal code driven by the materialized view definition,
     910                 :  * not for arbitrary user-supplied code.
     911                 :  *
     912                 :  * While the function names reflect the fact that their main intended use is
     913                 :  * incremental maintenance of materialized views (in response to changes to
     914                 :  * the data in referenced relations), they are initially used to allow REFRESH
     915                 :  * without blocking concurrent reads.
     916 ECB             :  */
     917                 : bool
     918 CBC          60 : MatViewIncrementalMaintenanceIsEnabled(void)
     919                 : {
     920 GIC          60 :     return matview_maintenance_depth > 0;
     921                 : }
     922 ECB             : 
     923                 : static void
     924 CBC          30 : OpenMatViewIncrementalMaintenance(void)
     925 ECB             : {
     926 GIC          30 :     matview_maintenance_depth++;
     927              30 : }
     928 ECB             : 
     929                 : static void
     930 CBC          30 : CloseMatViewIncrementalMaintenance(void)
     931 ECB             : {
     932 CBC          30 :     matview_maintenance_depth--;
     933 GIC          30 :     Assert(matview_maintenance_depth >= 0);
     934              30 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a