Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * matview.c
4 : * materialized view support
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/matview.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/multixact.h"
21 : #include "access/tableam.h"
22 : #include "access/xact.h"
23 : #include "access/xlog.h"
24 : #include "catalog/catalog.h"
25 : #include "catalog/indexing.h"
26 : #include "catalog/namespace.h"
27 : #include "catalog/pg_am.h"
28 : #include "catalog/pg_opclass.h"
29 : #include "catalog/pg_operator.h"
30 : #include "commands/cluster.h"
31 : #include "commands/matview.h"
32 : #include "commands/tablecmds.h"
33 : #include "commands/tablespace.h"
34 : #include "executor/executor.h"
35 : #include "executor/spi.h"
36 : #include "miscadmin.h"
37 : #include "parser/parse_relation.h"
38 : #include "pgstat.h"
39 : #include "rewrite/rewriteHandler.h"
40 : #include "storage/lmgr.h"
41 : #include "storage/smgr.h"
42 : #include "tcop/tcopprot.h"
43 : #include "utils/builtins.h"
44 : #include "utils/lsyscache.h"
45 : #include "utils/rel.h"
46 : #include "utils/snapmgr.h"
47 : #include "utils/syscache.h"
48 :
49 :
50 : typedef struct
51 : {
52 : DestReceiver pub; /* publicly-known function pointers */
53 : Oid transientoid; /* OID of new heap into which to store */
54 : /* These fields are filled by transientrel_startup: */
55 : Relation transientrel; /* relation to write to */
56 : CommandId output_cid; /* cmin to insert in output tuples */
57 : int ti_options; /* table_tuple_insert performance options */
58 : BulkInsertState bistate; /* bulk insert state */
59 : } DR_transientrel;
60 :
61 : static int matview_maintenance_depth = 0;
62 :
63 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65 : static void transientrel_shutdown(DestReceiver *self);
66 : static void transientrel_destroy(DestReceiver *self);
67 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68 : const char *queryString);
69 : static char *make_temptable_name_n(char *tempname, int n);
70 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71 : int save_sec_context);
72 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73 : static bool is_usable_unique_index(Relation indexRel);
74 : static void OpenMatViewIncrementalMaintenance(void);
75 : static void CloseMatViewIncrementalMaintenance(void);
76 :
77 : /*
78 : * SetMatViewPopulatedState
79 : * Mark a materialized view as populated, or not.
80 : *
81 : * NOTE: caller must be holding an appropriate lock on the relation.
82 : */
83 : void
3625 tgl 84 CBC 285 : SetMatViewPopulatedState(Relation relation, bool newstate)
85 : {
86 : Relation pgrel;
87 : HeapTuple tuple;
88 :
3689 kgrittn 89 285 : Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90 :
91 : /*
92 : * Update relation's pg_class entry. Crucial side-effect: other backends
93 : * (and this one too!) are sent SI message to make them rebuild relcache
94 : * entries.
95 : */
1539 andres 96 285 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
3625 tgl 97 285 : tuple = SearchSysCacheCopy1(RELOID,
98 : ObjectIdGetDatum(RelationGetRelid(relation)));
99 285 : if (!HeapTupleIsValid(tuple))
3625 tgl 100 UBC 0 : elog(ERROR, "cache lookup failed for relation %u",
101 : RelationGetRelid(relation));
102 :
3625 tgl 103 CBC 285 : ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104 :
2259 alvherre 105 285 : CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106 :
3625 tgl 107 285 : heap_freetuple(tuple);
1539 andres 108 285 : table_close(pgrel, RowExclusiveLock);
109 :
110 : /*
111 : * Advance command counter to make the updated pg_class row locally
112 : * visible.
113 : */
3625 tgl 114 285 : CommandCounterIncrement();
3689 kgrittn 115 285 : }
116 :
117 : /*
118 : * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
119 : *
120 : * This refreshes the materialized view by creating a new table and swapping
121 : * the relfilenumbers of the new table and the old materialized view, so the OID
122 : * of the original materialized view is preserved. Thus we do not lose GRANT
123 : * nor references to this materialized view.
124 : *
125 : * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126 : * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127 : * statement associated with the materialized view. The statement node's
128 : * skipData field shows whether the clause was used.
129 : *
130 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131 : * the new heap, it's better to create the indexes afterwards than to fill them
132 : * incrementally while we load.
133 : *
134 : * The matview's "populated" state is changed based on whether the contents
135 : * reflect the result set of the materialized view's query.
136 : */
137 : ObjectAddress
138 123 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139 : ParamListInfo params, QueryCompletion *qc)
140 : {
141 : Oid matviewOid;
142 : Relation matviewRel;
143 : RewriteRule *rule;
144 : List *actions;
145 : Query *dataQuery;
146 : Oid tableSpace;
147 : Oid relowner;
148 : Oid OIDNewHeap;
149 : DestReceiver *dest;
2213 tgl 150 123 : uint64 processed = 0;
151 : bool concurrent;
152 : LOCKMODE lockmode;
153 : char relpersistence;
154 : Oid save_userid;
155 : int save_sec_context;
156 : int save_nestlevel;
157 : ObjectAddress address;
158 :
159 : /* Determine strength of lock needed. */
3554 kgrittn 160 123 : concurrent = stmt->concurrent;
161 123 : lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162 :
163 : /*
164 : * Get a lock until end of transaction.
165 : */
3689 166 123 : matviewOid = RangeVarGetRelidExtended(stmt->relation,
167 : lockmode, 0,
168 : RangeVarCallbackMaintainsTable,
169 : NULL);
1539 andres 170 GIC 120 : matviewRel = table_open(matviewOid, NoLock);
335 noah 171 CBC 120 : relowner = matviewRel->rd_rel->relowner;
335 noah 172 ECB :
173 : /*
174 : * Switch to the owner's userid, so that any functions are run as that
175 : * user. Also lock down security-restricted operations and arrange to
176 : * make GUC variable changes local to this command.
177 : */
335 noah 178 GIC 120 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
335 noah 179 CBC 120 : SetUserIdAndSecContext(relowner,
335 noah 180 ECB : save_sec_context | SECURITY_RESTRICTED_OPERATION);
335 noah 181 GIC 120 : save_nestlevel = NewGUCNestLevel();
3689 kgrittn 182 ECB :
183 : /* Make sure it is a materialized view. */
3689 kgrittn 184 GIC 120 : if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
3689 kgrittn 185 LBC 0 : ereport(ERROR,
3689 kgrittn 186 EUB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
187 : errmsg("\"%s\" is not a materialized view",
188 : RelationGetRelationName(matviewRel))));
189 :
190 : /* Check that CONCURRENTLY is not specified if not populated. */
3554 kgrittn 191 GIC 120 : if (concurrent && !RelationIsPopulated(matviewRel))
3554 kgrittn 192 LBC 0 : ereport(ERROR,
3554 kgrittn 193 EUB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
194 : errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
195 :
196 : /* Check that conflicting options have not been specified. */
3554 kgrittn 197 GIC 120 : if (concurrent && stmt->skipData)
3554 kgrittn 198 CBC 3 : ereport(ERROR,
3554 kgrittn 199 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
200 : errmsg("%s and %s options cannot be used together",
201 : "CONCURRENTLY", "WITH NO DATA")));
202 :
203 : /*
204 : * Check that everything is correct for a refresh. Problems at this point
205 : * are internal errors, so elog is sufficient.
206 : */
3689 kgrittn 207 GIC 117 : if (matviewRel->rd_rel->relhasrules == false ||
3689 kgrittn 208 CBC 117 : matviewRel->rd_rules->numLocks < 1)
3689 kgrittn 209 LBC 0 : elog(ERROR,
3689 kgrittn 210 EUB : "materialized view \"%s\" is missing rewrite information",
211 : RelationGetRelationName(matviewRel));
212 :
3689 kgrittn 213 GIC 117 : if (matviewRel->rd_rules->numLocks > 1)
3689 kgrittn 214 LBC 0 : elog(ERROR,
3689 kgrittn 215 EUB : "materialized view \"%s\" has too many rules",
216 : RelationGetRelationName(matviewRel));
217 :
3689 kgrittn 218 GIC 117 : rule = matviewRel->rd_rules->rules[0];
3689 kgrittn 219 CBC 117 : if (rule->event != CMD_SELECT || !(rule->isInstead))
3689 kgrittn 220 LBC 0 : elog(ERROR,
3689 kgrittn 221 EUB : "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
222 : RelationGetRelationName(matviewRel));
223 :
3689 kgrittn 224 GIC 117 : actions = rule->actions;
3689 kgrittn 225 CBC 117 : if (list_length(actions) != 1)
3689 kgrittn 226 LBC 0 : elog(ERROR,
3689 kgrittn 227 EUB : "the rule for materialized view \"%s\" is not a single action",
228 : RelationGetRelationName(matviewRel));
229 :
230 : /*
231 : * Check that there is a unique index with no WHERE clause on one or more
232 : * columns of the materialized view if CONCURRENTLY is specified.
233 : */
2609 fujii 234 GIC 117 : if (concurrent)
2609 fujii 235 ECB : {
2495 rhaas 236 GIC 36 : List *indexoidlist = RelationGetIndexList(matviewRel);
2495 rhaas 237 ECB : ListCell *indexoidscan;
2609 fujii 238 GIC 36 : bool hasUniqueIndex = false;
2609 fujii 239 ECB :
2609 fujii 240 GIC 42 : foreach(indexoidscan, indexoidlist)
2609 fujii 241 ECB : {
2609 fujii 242 GIC 39 : Oid indexoid = lfirst_oid(indexoidscan);
2609 fujii 243 ECB : Relation indexRel;
244 :
2609 fujii 245 GIC 39 : indexRel = index_open(indexoid, AccessShareLock);
1847 tgl 246 CBC 39 : hasUniqueIndex = is_usable_unique_index(indexRel);
2609 fujii 247 39 : index_close(indexRel, AccessShareLock);
1847 tgl 248 39 : if (hasUniqueIndex)
249 33 : break;
2609 fujii 250 ECB : }
251 :
2609 fujii 252 GIC 36 : list_free(indexoidlist);
2609 fujii 253 ECB :
2609 fujii 254 GIC 36 : if (!hasUniqueIndex)
2609 fujii 255 CBC 3 : ereport(ERROR,
2609 fujii 256 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
257 : errmsg("cannot refresh materialized view \"%s\" concurrently",
258 : quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
259 : RelationGetRelationName(matviewRel))),
260 : errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
261 : }
262 :
263 : /*
264 : * The stored query was rewritten at the time of the MV definition, but
265 : * has not been scribbled on by the planner.
266 : */
2190 tgl 267 GIC 114 : dataQuery = linitial_node(Query, actions);
3689 kgrittn 268 ECB :
269 : /*
270 : * Check for active uses of the relation in the current transaction, such
271 : * as open scans.
272 : *
273 : * NB: We count on this to protect us against problems with refreshing the
274 : * data using TABLE_INSERT_FROZEN.
275 : */
3689 kgrittn 276 GIC 114 : CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
3689 kgrittn 277 ECB :
278 : /*
279 : * Tentatively mark the matview as populated or not (this will roll back
280 : * if we fail later).
281 : */
3625 tgl 282 GIC 114 : SetMatViewPopulatedState(matviewRel, !stmt->skipData);
3625 tgl 283 ECB :
284 : /* Concurrent refresh builds new data in temp tablespace, and does diff. */
3554 kgrittn 285 GIC 114 : if (concurrent)
3152 alvherre 286 ECB : {
1445 alvherre 287 GIC 33 : tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
3152 alvherre 288 CBC 33 : relpersistence = RELPERSISTENCE_TEMP;
3152 alvherre 289 ECB : }
290 : else
291 : {
3554 kgrittn 292 GIC 81 : tableSpace = matviewRel->rd_rel->reltablespace;
3152 alvherre 293 CBC 81 : relpersistence = matviewRel->rd_rel->relpersistence;
3152 alvherre 294 ECB : }
295 :
296 : /*
297 : * Create the transient table that will receive the regenerated data. Lock
298 : * it against access by any other process until commit (by which time it
299 : * will be gone).
300 : */
620 michael 301 GIC 228 : OIDNewHeap = make_new_heap(matviewOid, tableSpace,
620 michael 302 CBC 114 : matviewRel->rd_rel->relam,
620 michael 303 ECB : relpersistence, ExclusiveLock);
3442 kgrittn 304 GIC 114 : LockRelationOid(OIDNewHeap, AccessExclusiveLock);
3689 kgrittn 305 CBC 114 : dest = CreateTransientRelDestReceiver(OIDNewHeap);
3689 kgrittn 306 ECB :
307 : /* Generate the data, if wanted. */
3689 kgrittn 308 GIC 114 : if (!stmt->skipData)
2213 tgl 309 CBC 114 : processed = refresh_matview_datafill(dest, dataQuery, queryString);
3554 kgrittn 310 ECB :
311 : /* Make the matview match the newly generated data. */
3554 kgrittn 312 GIC 96 : if (concurrent)
3554 kgrittn 313 ECB : {
3554 kgrittn 314 GIC 33 : int old_depth = matview_maintenance_depth;
3554 kgrittn 315 ECB :
3554 kgrittn 316 GIC 33 : PG_TRY();
3554 kgrittn 317 ECB : {
3148 kgrittn 318 GIC 33 : refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
3148 kgrittn 319 ECB : save_sec_context);
320 : }
3554 kgrittn 321 GIC 3 : PG_CATCH();
3554 kgrittn 322 ECB : {
3554 kgrittn 323 GIC 3 : matview_maintenance_depth = old_depth;
3554 kgrittn 324 CBC 3 : PG_RE_THROW();
3554 kgrittn 325 ECB : }
3554 kgrittn 326 GIC 30 : PG_END_TRY();
3554 kgrittn 327 CBC 30 : Assert(matview_maintenance_depth == old_depth);
3554 kgrittn 328 ECB : }
329 : else
330 : {
3067 alvherre 331 GIC 63 : refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
3149 alvherre 332 ECB :
333 : /*
334 : * Inform cumulative stats system about our activity: basically, we
335 : * truncated the matview and inserted some new data. (The concurrent
336 : * code path above doesn't need to worry about this because the
337 : * inserts and deletes it issues get counted by lower-level code.)
338 : */
2213 tgl 339 GIC 60 : pgstat_count_truncate(matviewRel);
2213 tgl 340 CBC 60 : if (!stmt->skipData)
341 60 : pgstat_count_heap_insert(matviewRel, processed);
2213 tgl 342 ECB : }
343 :
1539 andres 344 GIC 90 : table_close(matviewRel, NoLock);
2213 tgl 345 ECB :
346 : /* Roll back any GUC changes */
3148 kgrittn 347 GIC 90 : AtEOXact_GUC(false, save_nestlevel);
3148 kgrittn 348 ECB :
349 : /* Restore userid and security context */
3148 kgrittn 350 GIC 90 : SetUserIdAndSecContext(save_userid, save_sec_context);
3148 kgrittn 351 ECB :
2959 alvherre 352 GIC 90 : ObjectAddressSet(address, RelationRelationId, matviewOid);
2959 alvherre 353 ECB :
354 : /*
355 : * Save the rowcount so that pg_stat_statements can track the total number
356 : * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
357 : * still don't display the rowcount in the command completion tag output,
358 : * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
359 : * command tag is left false in cmdtaglist.h. Otherwise, the change of
360 : * completion tag output might break applications using it.
361 : */
878 fujii 362 GIC 90 : if (qc)
878 fujii 363 CBC 90 : SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
878 fujii 364 ECB :
2959 alvherre 365 GIC 90 : return address;
3689 kgrittn 366 ECB : }
367 :
368 : /*
369 : * refresh_matview_datafill
370 : *
371 : * Execute the given query, sending result rows to "dest" (which will
372 : * insert them into the target matview).
373 : *
374 : * Returns number of rows inserted.
375 : */
376 : static uint64
3689 kgrittn 377 GIC 114 : refresh_matview_datafill(DestReceiver *dest, Query *query,
3148 kgrittn 378 ECB : const char *queryString)
379 : {
380 : List *rewritten;
381 : PlannedStmt *plan;
382 : QueryDesc *queryDesc;
383 : Query *copied_query;
384 : uint64 processed;
385 :
386 : /* Lock and rewrite, using a copy to preserve the original query. */
3445 kgrittn 387 GIC 114 : copied_query = copyObject(query);
3321 tgl 388 CBC 114 : AcquireRewriteLocks(copied_query, true, false);
3445 kgrittn 389 114 : rewritten = QueryRewrite(copied_query);
3689 kgrittn 390 ECB :
391 : /* SELECT should never rewrite to more or less than one SELECT query */
3689 kgrittn 392 GIC 114 : if (list_length(rewritten) != 1)
3689 kgrittn 393 LBC 0 : elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
3689 kgrittn 394 GBC 114 : query = (Query *) linitial(rewritten);
3689 kgrittn 395 ECB :
396 : /* Check for user-requested abort. */
3689 kgrittn 397 GIC 114 : CHECK_FOR_INTERRUPTS();
3689 kgrittn 398 ECB :
399 : /* Plan the query which will generate data for the refresh. */
753 tmunro 400 GIC 114 : plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
3689 kgrittn 401 ECB :
402 : /*
403 : * Use a snapshot with an updated command ID to ensure this query sees
404 : * results of any previously executed queries. (This could only matter if
405 : * the planner executed an allegedly-stable function that changed the
406 : * database contents, but let's do it anyway to be safe.)
407 : */
3689 kgrittn 408 GIC 111 : PushCopiedSnapshot(GetActiveSnapshot());
3689 kgrittn 409 CBC 111 : UpdateActiveSnapshotCommandId();
3689 kgrittn 410 ECB :
411 : /* Create a QueryDesc, redirecting output to our tuple receiver */
3689 kgrittn 412 GIC 111 : queryDesc = CreateQueryDesc(plan, queryString,
3689 kgrittn 413 ECB : GetActiveSnapshot(), InvalidSnapshot,
414 : dest, NULL, NULL, 0);
415 :
416 : /* call ExecutorStart to prepare the plan for execution */
1601 andres 417 GIC 111 : ExecutorStart(queryDesc, 0);
3689 kgrittn 418 ECB :
419 : /* run the plan */
11 peter 420 GNC 111 : ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
3689 kgrittn 421 ECB :
2213 tgl 422 GIC 96 : processed = queryDesc->estate->es_processed;
2213 tgl 423 ECB :
424 : /* and clean up */
3689 kgrittn 425 GIC 96 : ExecutorFinish(queryDesc);
3689 kgrittn 426 CBC 96 : ExecutorEnd(queryDesc);
3689 kgrittn 427 ECB :
3689 kgrittn 428 GIC 96 : FreeQueryDesc(queryDesc);
3689 kgrittn 429 ECB :
3689 kgrittn 430 GIC 96 : PopActiveSnapshot();
2213 tgl 431 ECB :
2213 tgl 432 GIC 96 : return processed;
3689 kgrittn 433 ECB : }
434 :
435 : DestReceiver *
3689 kgrittn 436 GIC 114 : CreateTransientRelDestReceiver(Oid transientoid)
3689 kgrittn 437 ECB : {
3689 kgrittn 438 GIC 114 : DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
3689 kgrittn 439 ECB :
3689 kgrittn 440 GIC 114 : self->pub.receiveSlot = transientrel_receive;
3689 kgrittn 441 CBC 114 : self->pub.rStartup = transientrel_startup;
442 114 : self->pub.rShutdown = transientrel_shutdown;
443 114 : self->pub.rDestroy = transientrel_destroy;
444 114 : self->pub.mydest = DestTransientRel;
445 114 : self->transientoid = transientoid;
3689 kgrittn 446 ECB :
3689 kgrittn 447 GIC 114 : return (DestReceiver *) self;
3689 kgrittn 448 ECB : }
449 :
450 : /*
451 : * transientrel_startup --- executor startup
452 : */
453 : static void
3689 kgrittn 454 GIC 111 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
3689 kgrittn 455 ECB : {
3689 kgrittn 456 GIC 111 : DR_transientrel *myState = (DR_transientrel *) self;
3602 bruce 457 ECB : Relation transientrel;
458 :
1539 andres 459 GIC 111 : transientrel = table_open(myState->transientoid, NoLock);
3689 kgrittn 460 ECB :
461 : /*
462 : * Fill private fields of myState for use by later routines
463 : */
3689 kgrittn 464 GIC 111 : myState->transientrel = transientrel;
3689 kgrittn 465 CBC 111 : myState->output_cid = GetCurrentCommandId(true);
1113 noah 466 111 : myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
467 111 : myState->bistate = GetBulkInsertState();
1113 noah 468 ECB :
469 : /*
470 : * Valid smgr_targblock implies something already wrote to the relation.
471 : * This may be harmless, but this function hasn't planned for it.
472 : */
3689 kgrittn 473 GIC 111 : Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
3689 kgrittn 474 CBC 111 : }
3689 kgrittn 475 ECB :
476 : /*
477 : * transientrel_receive --- receive one tuple
478 : */
479 : static bool
3689 kgrittn 480 GIC 284 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
3689 kgrittn 481 ECB : {
3689 kgrittn 482 GIC 284 : DR_transientrel *myState = (DR_transientrel *) self;
3689 kgrittn 483 ECB :
484 : /*
485 : * Note that the input slot might not be of the type of the target
486 : * relation. That's supported by table_tuple_insert(), but slightly less
487 : * efficient than inserting with the right slot - but the alternative
488 : * would be to copy into a slot of the right type, which would not be
489 : * cheap either. This also doesn't allow accessing per-AM data (say a
490 : * tuple's xmin), but since we don't do that here...
491 : */
492 :
1417 andres 493 GIC 284 : table_tuple_insert(myState->transientrel,
1417 andres 494 ECB : slot,
495 : myState->output_cid,
496 : myState->ti_options,
497 : myState->bistate);
498 :
499 : /* We know this is a newly created relation, so there are no indexes */
500 :
2498 rhaas 501 GIC 284 : return true;
3689 kgrittn 502 ECB : }
503 :
504 : /*
505 : * transientrel_shutdown --- executor end
506 : */
507 : static void
3689 kgrittn 508 GIC 96 : transientrel_shutdown(DestReceiver *self)
3689 kgrittn 509 ECB : {
3689 kgrittn 510 GIC 96 : DR_transientrel *myState = (DR_transientrel *) self;
3689 kgrittn 511 ECB :
3689 kgrittn 512 GIC 96 : FreeBulkInsertState(myState->bistate);
3689 kgrittn 513 ECB :
1469 andres 514 GIC 96 : table_finish_bulk_insert(myState->transientrel, myState->ti_options);
3689 kgrittn 515 ECB :
516 : /* close transientrel, but keep lock until commit */
1539 andres 517 GIC 96 : table_close(myState->transientrel, NoLock);
3689 kgrittn 518 CBC 96 : myState->transientrel = NULL;
519 96 : }
3689 kgrittn 520 ECB :
521 : /*
522 : * transientrel_destroy --- release DestReceiver object
523 : */
524 : static void
3689 kgrittn 525 UIC 0 : transientrel_destroy(DestReceiver *self)
3689 kgrittn 526 EUB : {
3689 kgrittn 527 UIC 0 : pfree(self);
3689 kgrittn 528 UBC 0 : }
3554 kgrittn 529 EUB :
530 :
531 : /*
532 : * Given a qualified temporary table name, append an underscore followed by
533 : * the given integer, to make a new table name based on the old one.
534 : * The result is a palloc'd string.
535 : *
536 : * As coded, this would fail to make a valid SQL name if the given name were,
537 : * say, "FOO"."BAR". Currently, the table name portion of the input will
538 : * never be double-quoted because it's of the form "pg_temp_NNN", cf
539 : * make_new_heap(). But we might have to work harder someday.
540 : */
541 : static char *
3554 kgrittn 542 GIC 33 : make_temptable_name_n(char *tempname, int n)
3554 kgrittn 543 ECB : {
544 : StringInfoData namebuf;
545 :
3554 kgrittn 546 GIC 33 : initStringInfo(&namebuf);
3554 kgrittn 547 CBC 33 : appendStringInfoString(&namebuf, tempname);
2557 peter_e 548 33 : appendStringInfo(&namebuf, "_%d", n);
3554 kgrittn 549 33 : return namebuf.data;
3554 kgrittn 550 ECB : }
551 :
552 : /*
553 : * refresh_by_match_merge
554 : *
555 : * Refresh a materialized view with transactional semantics, while allowing
556 : * concurrent reads.
557 : *
558 : * This is called after a new version of the data has been created in a
559 : * temporary table. It performs a full outer join against the old version of
560 : * the data, producing "diff" results. This join cannot work if there are any
561 : * duplicated rows in either the old or new versions, in the sense that every
562 : * column would compare as equal between the two rows. It does work correctly
563 : * in the face of rows which have at least one NULL value, with all non-NULL
564 : * columns equal. The behavior of NULLs on equality tests and on UNIQUE
565 : * indexes turns out to be quite convenient here; the tests we need to make
566 : * are consistent with default behavior. If there is at least one UNIQUE
567 : * index on the materialized view, we have exactly the guarantee we need.
568 : *
569 : * The temporary table used to hold the diff results contains just the TID of
570 : * the old record (if matched) and the ROW from the new table as a single
571 : * column of complex record type (if matched).
572 : *
573 : * Once we have the diff table, we perform set-based DELETE and INSERT
574 : * operations against the materialized view, and discard both temporary
575 : * tables.
576 : *
577 : * Everything from the generation of the new data to applying the differences
578 : * takes place under cover of an ExclusiveLock, since it seems as though we
579 : * would want to prohibit not only concurrent REFRESH operations, but also
580 : * incremental maintenance. It also doesn't seem reasonable or safe to allow
581 : * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
582 : * this command.
583 : */
584 : static void
3148 kgrittn 585 GIC 33 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
3148 kgrittn 586 ECB : int save_sec_context)
587 : {
588 : StringInfoData querybuf;
589 : Relation matviewRel;
590 : Relation tempRel;
591 : char *matviewname;
592 : char *tempname;
593 : char *diffname;
594 : TupleDesc tupdesc;
595 : bool foundUniqueIndex;
596 : List *indexoidlist;
597 : ListCell *indexoidscan;
598 : int16 relnatts;
599 : Oid *opUsedForQual;
600 :
3554 kgrittn 601 GIC 33 : initStringInfo(&querybuf);
1539 andres 602 CBC 33 : matviewRel = table_open(matviewOid, NoLock);
3554 kgrittn 603 33 : matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
2118 tgl 604 33 : RelationGetRelationName(matviewRel));
1539 andres 605 33 : tempRel = table_open(tempOid, NoLock);
3554 kgrittn 606 33 : tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
607 33 : RelationGetRelationName(tempRel));
608 33 : diffname = make_temptable_name_n(tempname, 2);
3554 kgrittn 609 ECB :
1828 teodor 610 GIC 33 : relnatts = RelationGetNumberOfAttributes(matviewRel);
3554 kgrittn 611 ECB :
612 : /* Open SPI context. */
3554 kgrittn 613 GIC 33 : if (SPI_connect() != SPI_OK_CONNECT)
3554 kgrittn 614 LBC 0 : elog(ERROR, "SPI_connect failed");
3554 kgrittn 615 EUB :
616 : /* Analyze the temp table with the new contents. */
3554 kgrittn 617 GIC 33 : appendStringInfo(&querybuf, "ANALYZE %s", tempname);
3554 kgrittn 618 CBC 33 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
3554 kgrittn 619 LBC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
3554 kgrittn 620 EUB :
621 : /*
622 : * We need to ensure that there are not duplicate rows without NULLs in
623 : * the new data set before we can count on the "diff" results. Check for
624 : * that in a way that allows showing the first duplicated row found. Even
625 : * after we pass this test, a unique index on the materialized view may
626 : * find a duplicate key problem.
627 : *
628 : * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
629 : * keep ".*" from being expanded into multiple columns in a SELECT list.
630 : * Compare ruleutils.c's get_variable().
631 : */
3554 kgrittn 632 GIC 33 : resetStringInfo(&querybuf);
3554 kgrittn 633 CBC 33 : appendStringInfo(&querybuf,
610 tgl 634 ECB : "SELECT newdata.*::%s FROM %s newdata "
635 : "WHERE newdata.* IS NOT NULL AND EXISTS "
636 : "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
637 : "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
638 : "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
639 : "newdata.ctid)",
640 : tempname, tempname, tempname);
3554 kgrittn 641 GIC 33 : if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
3554 kgrittn 642 LBC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
3554 kgrittn 643 GBC 33 : if (SPI_processed > 0)
3554 kgrittn 644 ECB : {
645 : /*
646 : * Note that this ereport() is returning data to the user. Generally,
647 : * we would want to make sure that the user has been granted access to
648 : * this data. However, REFRESH MAT VIEW is only able to be run by the
649 : * owner of the mat view (or a superuser) and therefore there is no
650 : * need to check for access to data in the mat view.
651 : */
3554 kgrittn 652 GIC 3 : ereport(ERROR,
3554 kgrittn 653 ECB : (errcode(ERRCODE_CARDINALITY_VIOLATION),
654 : errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
655 : RelationGetRelationName(matviewRel)),
656 : errdetail("Row: %s",
657 : SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
658 : }
659 :
3148 kgrittn 660 GIC 30 : SetUserIdAndSecContext(relowner,
3148 kgrittn 661 ECB : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
662 :
663 : /* Start building the query for creating the diff table. */
3554 kgrittn 664 GIC 30 : resetStringInfo(&querybuf);
3554 kgrittn 665 CBC 30 : appendStringInfo(&querybuf,
3554 kgrittn 666 ECB : "CREATE TEMP TABLE %s AS "
667 : "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
668 : "FROM %s mv FULL JOIN %s newdata ON (",
669 : diffname, tempname, matviewname, tempname);
670 :
671 : /*
672 : * Get the list of index OIDs for the table from the relcache, and look up
673 : * each one in the pg_index syscache. We will test for equality on all
674 : * columns present in all unique indexes which only reference columns and
675 : * include all rows.
676 : */
3554 kgrittn 677 GIC 30 : tupdesc = matviewRel->rd_att;
1847 tgl 678 CBC 30 : opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
3554 kgrittn 679 30 : foundUniqueIndex = false;
1847 tgl 680 ECB :
3554 kgrittn 681 GIC 30 : indexoidlist = RelationGetIndexList(matviewRel);
3554 kgrittn 682 ECB :
3554 kgrittn 683 GIC 66 : foreach(indexoidscan, indexoidlist)
3554 kgrittn 684 ECB : {
3554 kgrittn 685 GIC 36 : Oid indexoid = lfirst_oid(indexoidscan);
3534 kgrittn 686 ECB : Relation indexRel;
687 :
3534 kgrittn 688 GIC 36 : indexRel = index_open(indexoid, RowExclusiveLock);
1847 tgl 689 CBC 36 : if (is_usable_unique_index(indexRel))
3554 kgrittn 690 ECB : {
1847 tgl 691 GIC 36 : Form_pg_index indexStruct = indexRel->rd_index;
1828 teodor 692 CBC 36 : int indnkeyatts = indexStruct->indnkeyatts;
1847 tgl 693 ECB : oidvector *indclass;
694 : Datum indclassDatum;
695 : int i;
696 :
697 : /* Must get indclass the hard way. */
15 dgustafsson 698 GNC 36 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
699 36 : indexRel->rd_indextuple,
700 : Anum_pg_index_indclass);
1847 tgl 701 GIC 36 : indclass = (oidvector *) DatumGetPointer(indclassDatum);
1847 tgl 702 ECB :
703 : /* Add quals for all columns from this index. */
1828 teodor 704 CBC 80 : for (i = 0; i < indnkeyatts; i++)
3554 kgrittn 705 ECB : {
3534 kgrittn 706 CBC 44 : int attnum = indexStruct->indkey.values[i];
1847 tgl 707 44 : Oid opclass = indclass->values[i];
2058 andres 708 GIC 44 : Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1847 tgl 709 44 : Oid attrtype = attr->atttypid;
710 : HeapTuple cla_ht;
711 : Form_pg_opclass cla_tup;
712 : Oid opfamily;
713 : Oid opcintype;
714 : Oid op;
715 : const char *leftop;
716 : const char *rightop;
717 :
718 : /*
719 : * Identify the equality operator associated with this index
1847 tgl 720 ECB : * column. First we need to look up the column's opclass.
3554 kgrittn 721 : */
1847 tgl 722 GBC 44 : cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
1847 tgl 723 CBC 44 : if (!HeapTupleIsValid(cla_ht))
1847 tgl 724 LBC 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
1847 tgl 725 CBC 44 : cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
726 44 : Assert(cla_tup->opcmethod == BTREE_AM_OID);
727 44 : opfamily = cla_tup->opcfamily;
1847 tgl 728 GIC 44 : opcintype = cla_tup->opcintype;
1847 tgl 729 CBC 44 : ReleaseSysCache(cla_ht);
730 :
731 44 : op = get_opfamily_member(opfamily, opcintype, opcintype,
1847 tgl 732 EUB : BTEqualStrategyNumber);
1847 tgl 733 GIC 44 : if (!OidIsValid(op))
1847 tgl 734 UIC 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
735 : BTEqualStrategyNumber, opcintype, opcintype, opfamily);
736 :
737 : /*
738 : * If we find the same column with the same equality semantics
739 : * in more than one index, we only need to emit the equality
740 : * clause once.
741 : *
742 : * Since we only remember the last equality operator, this
743 : * code could be fooled into emitting duplicate clauses given
744 : * multiple indexes with several different opclasses ... but
745 : * that's so unlikely it doesn't seem worth spending extra
1847 tgl 746 ECB : * code to avoid.
1847 tgl 747 EUB : */
1847 tgl 748 CBC 44 : if (opUsedForQual[attnum - 1] == op)
3554 kgrittn 749 UIC 0 : continue;
1847 tgl 750 GIC 44 : opUsedForQual[attnum - 1] = op;
751 :
752 : /*
3554 kgrittn 753 ECB : * Actually add the qual, ANDed with any others.
754 : */
3554 kgrittn 755 GIC 44 : if (foundUniqueIndex)
3554 kgrittn 756 CBC 14 : appendStringInfoString(&querybuf, " AND ");
3554 kgrittn 757 ECB :
610 tgl 758 CBC 44 : leftop = quote_qualified_identifier("newdata",
1847 759 44 : NameStr(attr->attname));
610 tgl 760 GIC 44 : rightop = quote_qualified_identifier("mv",
1847 tgl 761 CBC 44 : NameStr(attr->attname));
762 :
1847 tgl 763 GIC 44 : generate_operator_clause(&querybuf,
764 : leftop, attrtype,
765 : op,
1847 tgl 766 ECB : rightop, attrtype);
767 :
3554 kgrittn 768 GIC 44 : foundUniqueIndex = true;
769 : }
770 : }
3309 tgl 771 ECB :
772 : /* Keep the locks, since we're about to run DML which needs them. */
3309 tgl 773 GIC 36 : index_close(indexRel, NoLock);
3554 kgrittn 774 ECB : }
775 :
3554 kgrittn 776 GIC 30 : list_free(indexoidlist);
777 :
778 : /*
779 : * There must be at least one usable unique index on the matview.
780 : *
781 : * ExecRefreshMatView() checks that after taking the exclusive lock on the
782 : * matview. So at least one unique index is guaranteed to exist here
1847 tgl 783 ECB : * because the lock is still being held; so an Assert seems sufficient.
784 : */
2609 fujii 785 CBC 30 : Assert(foundUniqueIndex);
786 :
3554 kgrittn 787 GIC 30 : appendStringInfoString(&querybuf,
788 : " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
789 : "WHERE newdata.* IS NULL OR mv.* IS NULL "
790 : "ORDER BY tid");
3554 kgrittn 791 ECB :
3554 kgrittn 792 EUB : /* Create the temporary "diff" table. */
3554 kgrittn 793 GIC 30 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
3554 kgrittn 794 LBC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
795 :
3148 kgrittn 796 GIC 30 : SetUserIdAndSecContext(relowner,
797 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
798 :
799 : /*
800 : * We have no further use for data from the "full-data" temp table, but we
801 : * must keep it around because its type is referenced from the diff table.
802 : */
3554 kgrittn 803 ECB :
804 : /* Analyze the diff table. */
3554 kgrittn 805 CBC 30 : resetStringInfo(&querybuf);
3554 kgrittn 806 GBC 30 : appendStringInfo(&querybuf, "ANALYZE %s", diffname);
3554 kgrittn 807 GIC 30 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
3554 kgrittn 808 LBC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
809 :
3554 kgrittn 810 GIC 30 : OpenMatViewIncrementalMaintenance();
3554 kgrittn 811 ECB :
812 : /* Deletes must come before inserts; do them first. */
3554 kgrittn 813 GIC 30 : resetStringInfo(&querybuf);
814 30 : appendStringInfo(&querybuf,
815 : "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
816 : "(SELECT diff.tid FROM %s diff "
817 : "WHERE diff.tid IS NOT NULL "
610 tgl 818 ECB : "AND diff.newdata IS NULL)",
3554 kgrittn 819 EUB : matviewname, diffname);
3554 kgrittn 820 GIC 30 : if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
3554 kgrittn 821 UIC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
3554 kgrittn 822 ECB :
823 : /* Inserts go last. */
3554 kgrittn 824 GIC 30 : resetStringInfo(&querybuf);
825 30 : appendStringInfo(&querybuf,
826 : "INSERT INTO %s SELECT (diff.newdata).* "
610 tgl 827 ECB : "FROM %s diff WHERE tid IS NULL",
3554 kgrittn 828 EUB : matviewname, diffname);
3554 kgrittn 829 GIC 30 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
3554 kgrittn 830 UIC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
3554 kgrittn 831 ECB :
832 : /* We're done maintaining the materialized view. */
3554 kgrittn 833 CBC 30 : CloseMatViewIncrementalMaintenance();
1539 andres 834 GIC 30 : table_close(tempRel, NoLock);
835 30 : table_close(matviewRel, NoLock);
3554 kgrittn 836 ECB :
837 : /* Clean up temp tables. */
3554 kgrittn 838 CBC 30 : resetStringInfo(&querybuf);
3554 kgrittn 839 GBC 30 : appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
3554 kgrittn 840 GIC 30 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
3554 kgrittn 841 UIC 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
3554 kgrittn 842 ECB :
3554 kgrittn 843 EUB : /* Close SPI context. */
3554 kgrittn 844 CBC 30 : if (SPI_finish() != SPI_OK_FINISH)
3554 kgrittn 845 UIC 0 : elog(ERROR, "SPI_finish failed");
3554 kgrittn 846 GIC 30 : }
847 :
848 : /*
849 : * Swap the physical files of the target and transient tables, then rebuild
850 : * the target's indexes and throw away the transient table. Security context
851 : * swapping is handled by the called function, so it is not needed here.
3554 kgrittn 852 ECB : */
853 : static void
3067 alvherre 854 CBC 63 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
855 : {
3554 kgrittn 856 63 : finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
857 : RecentXmin, ReadNextMultiXactId(), relpersistence);
3554 kgrittn 858 GIC 60 : }
859 :
860 : /*
861 : * Check whether specified index is usable for match merge.
1847 tgl 862 ECB : */
863 : static bool
1847 tgl 864 CBC 75 : is_usable_unique_index(Relation indexRel)
865 : {
1847 tgl 866 GIC 75 : Form_pg_index indexStruct = indexRel->rd_index;
867 :
868 : /*
869 : * Must be unique, valid, immediate, non-partial, and be defined over
870 : * plain user columns (not expressions). We also require it to be a
871 : * btree. Even if we had any other unique index kinds, we'd not know how
872 : * to identify the corresponding equality operator, nor could we be sure
873 : * that the planner could implement the required FULL JOIN with non-btree
1847 tgl 874 ECB : * operators.
875 : */
1847 tgl 876 CBC 75 : if (indexStruct->indisunique &&
877 75 : indexStruct->indimmediate &&
878 75 : indexRel->rd_rel->relam == BTREE_AM_OID &&
1564 peter_e 879 150 : indexStruct->indisvalid &&
1847 tgl 880 GIC 75 : RelationGetIndexPredicate(indexRel) == NIL &&
881 72 : indexStruct->indnatts > 0)
882 : {
883 : /*
884 : * The point of groveling through the index columns individually is to
885 : * reject both index expressions and system columns. Currently,
886 : * matviews couldn't have OID columns so there's no way to create an
887 : * index on a system column; but maybe someday that wouldn't be true,
1847 tgl 888 ECB : * so let's be safe.
889 : */
1847 tgl 890 GIC 72 : int numatts = indexStruct->indnatts;
1847 tgl 891 ECB : int i;
892 :
1847 tgl 893 CBC 157 : for (i = 0; i < numatts; i++)
894 : {
895 88 : int attnum = indexStruct->indkey.values[i];
1847 tgl 896 ECB :
1847 tgl 897 GIC 88 : if (attnum <= 0)
1847 tgl 898 CBC 3 : return false;
899 : }
900 69 : return true;
901 : }
1847 tgl 902 GIC 3 : return false;
903 : }
904 :
905 :
906 : /*
907 : * This should be used to test whether the backend is in a context where it is
908 : * OK to allow DML statements to modify materialized views. We only want to
909 : * allow that for internal code driven by the materialized view definition,
910 : * not for arbitrary user-supplied code.
911 : *
912 : * While the function names reflect the fact that their main intended use is
913 : * incremental maintenance of materialized views (in response to changes to
914 : * the data in referenced relations), they are initially used to allow REFRESH
915 : * without blocking concurrent reads.
3554 kgrittn 916 ECB : */
917 : bool
3554 kgrittn 918 CBC 60 : MatViewIncrementalMaintenanceIsEnabled(void)
919 : {
3554 kgrittn 920 GIC 60 : return matview_maintenance_depth > 0;
921 : }
3538 kgrittn 922 ECB :
923 : static void
3538 kgrittn 924 CBC 30 : OpenMatViewIncrementalMaintenance(void)
3538 kgrittn 925 ECB : {
3538 kgrittn 926 GIC 30 : matview_maintenance_depth++;
927 30 : }
3538 kgrittn 928 ECB :
929 : static void
3538 kgrittn 930 CBC 30 : CloseMatViewIncrementalMaintenance(void)
3538 kgrittn 931 ECB : {
3538 kgrittn 932 CBC 30 : matview_maintenance_depth--;
3538 kgrittn 933 GIC 30 : Assert(matview_maintenance_depth >= 0);
934 30 : }
|