Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/relscan.h"
19 : #include "access/tableam.h"
20 : #include "access/transam.h"
21 : #include "access/xact.h"
22 : #include "commands/trigger.h"
23 : #include "executor/executor.h"
24 : #include "executor/nodeModifyTable.h"
25 : #include "nodes/nodeFuncs.h"
26 : #include "parser/parse_relation.h"
27 : #include "parser/parsetree.h"
28 : #include "replication/logicalrelation.h"
29 : #include "storage/bufmgr.h"
30 : #include "storage/lmgr.h"
31 : #include "utils/builtins.h"
32 : #include "utils/datum.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/snapmgr.h"
37 : #include "utils/syscache.h"
38 : #include "utils/typcache.h"
39 :
40 :
41 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
42 : TypeCacheEntry **eq);
43 :
44 : /*
45 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
46 : * is setup to match 'rel' (*NOT* idxrel!).
47 : *
48 : * Returns how many columns to use for the index scan.
49 : *
50 : * This is not generic routine, it expects the idxrel to be a btree, non-partial
51 : * and have at least one column reference (i.e. cannot consist of only
52 : * expressions).
53 : *
54 : * By definition, replication identity of a rel meets all limitations associated
55 : * with that. Note that any other index could also meet these limitations.
56 : */
57 : static int
2271 peter_e 58 GIC 72071 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
59 : TupleTableSlot *searchslot)
60 : {
61 : int index_attoff;
25 akapila 62 GNC 72071 : int skey_attoff = 0;
63 : Datum indclassDatum;
64 : oidvector *opclass;
2271 peter_e 65 GIC 72071 : int2vector *indkey = &idxrel->rd_index->indkey;
66 :
15 dgustafsson 67 GNC 72071 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
68 : Anum_pg_index_indclass);
2271 peter_e 69 GIC 72071 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
2271 peter_e 70 ECB :
71 : /* Build scankey for every non-expression attribute in the index. */
25 akapila 72 GNC 144155 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
73 72084 : index_attoff++)
74 : {
75 : Oid operator;
76 : Oid optype;
2271 peter_e 77 ECB : Oid opfamily;
78 : RegProcedure regop;
25 akapila 79 GNC 72084 : int table_attno = indkey->values[index_attoff];
80 :
81 72084 : if (!AttributeNumberIsValid(table_attno))
82 : {
83 : /*
84 : * XXX: Currently, we don't support expressions in the scan key,
85 : * see code below.
86 : */
87 2 : continue;
88 : }
89 :
90 : /*
2271 peter_e 91 ECB : * Load the operator info. We need this to get the equality operator
92 : * function for the scan key.
93 : */
25 akapila 94 GNC 72082 : optype = get_opclass_input_type(opclass->values[index_attoff]);
95 72082 : opfamily = get_opclass_family(opclass->values[index_attoff]);
96 :
2271 peter_e 97 GIC 72082 : operator = get_opfamily_member(opfamily, optype,
98 : optype,
99 : BTEqualStrategyNumber);
2271 peter_e 100 CBC 72082 : if (!OidIsValid(operator))
2085 tgl 101 UIC 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
102 : BTEqualStrategyNumber, optype, optype, opfamily);
103 :
2271 peter_e 104 GIC 72082 : regop = get_opcode(operator);
105 :
106 : /* Initialize the scankey. */
25 akapila 107 GNC 72082 : ScanKeyInit(&skey[skey_attoff],
108 72082 : index_attoff + 1,
109 : BTEqualStrategyNumber,
2271 peter_e 110 ECB : regop,
25 akapila 111 GNC 72082 : searchslot->tts_values[table_attno - 1]);
112 :
113 72082 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
1479 peter 114 EUB :
115 : /* Check for null value. */
25 akapila 116 GNC 72082 : if (searchslot->tts_isnull[table_attno - 1])
117 1 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
118 :
119 72082 : skey_attoff++;
2271 peter_e 120 ECB : }
121 :
122 : /* There must always be at least one attribute for the index scan. */
25 akapila 123 GNC 72071 : Assert(skey_attoff > 0);
124 :
125 72071 : return skey_attoff;
2271 peter_e 126 ECB : }
127 :
128 : /*
129 : * Search the relation 'rel' for tuple using the index.
130 : *
131 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
132 : * contents, and return true. Return false otherwise.
133 : */
134 : bool
2271 peter_e 135 GIC 72071 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
136 : LockTupleMode lockmode,
137 : TupleTableSlot *searchslot,
2271 peter_e 138 ECB : TupleTableSlot *outslot)
139 : {
2153 bruce 140 : ScanKeyData skey[INDEX_MAX_KEYS];
141 : int skey_attoff;
142 : IndexScanDesc scan;
143 : SnapshotData snap;
144 : TransactionId xwait;
145 : Relation idxrel;
146 : bool found;
25 akapila 147 GNC 72071 : TypeCacheEntry **eq = NULL;
148 : bool isIdxSafeToSkipDuplicates;
149 :
150 : /* Open the index. */
2271 peter_e 151 GIC 72071 : idxrel = index_open(idxoid, RowExclusiveLock);
152 :
25 akapila 153 GNC 72071 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
154 :
2271 peter_e 155 GIC 72071 : InitDirtySnapshot(snap);
156 :
157 : /* Build scan key. */
25 akapila 158 GNC 72071 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
159 :
160 : /* Start an index scan. */
161 72071 : scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
162 :
2271 peter_e 163 UIC 0 : retry:
2271 peter_e 164 GIC 72071 : found = false;
165 :
25 akapila 166 GNC 72071 : index_rescan(scan, skey, skey_attoff, NULL, 0);
167 :
168 : /* Try to find the tuple */
169 72071 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
2271 peter_e 170 ECB : {
171 : /*
172 : * Avoid expensive equality check if the index is primary key or
173 : * replica identity index.
174 : */
25 akapila 175 GNC 72063 : if (!isIdxSafeToSkipDuplicates)
176 : {
177 11 : if (eq == NULL)
178 : {
179 : #ifdef USE_ASSERT_CHECKING
180 : /* apply assertions only once for the input idxoid */
181 11 : IndexInfo *indexInfo = BuildIndexInfo(idxrel);
182 :
183 11 : Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
184 : #endif
185 :
186 11 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
187 : }
188 :
189 11 : if (!tuples_equal(outslot, searchslot, eq))
25 akapila 190 UNC 0 : continue;
191 : }
192 :
2271 peter_e 193 CBC 72063 : ExecMaterializeSlot(outslot);
194 :
195 144126 : xwait = TransactionIdIsValid(snap.xmin) ?
2271 peter_e 196 GIC 72063 : snap.xmin : snap.xmax;
197 :
2271 peter_e 198 ECB : /*
199 : * If the tuple is locked, wait for locking transaction to finish and
200 : * retry.
201 : */
2271 peter_e 202 GIC 72063 : if (TransactionIdIsValid(xwait))
2271 peter_e 203 EUB : {
2271 peter_e 204 LBC 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
2271 peter_e 205 UIC 0 : goto retry;
2271 peter_e 206 ECB : }
207 :
208 : /* Found our tuple and it's not locked */
25 akapila 209 GNC 72063 : found = true;
210 72063 : break;
211 : }
212 :
2271 peter_e 213 ECB : /* Found tuple, try to lock it in the lockmode. */
2271 peter_e 214 GIC 72071 : if (found)
215 : {
216 : TM_FailureData tmfd;
217 : TM_Result res;
218 :
2271 peter_e 219 CBC 72063 : PushActiveSnapshot(GetLatestSnapshot());
220 :
1417 andres 221 72063 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
222 : outslot,
223 : GetCurrentCommandId(false),
224 : lockmode,
1478 andres 225 ECB : LockWaitBlock,
226 : 0 /* don't follow updates */ ,
227 : &tmfd);
228 :
2271 peter_e 229 GIC 72063 : PopActiveSnapshot();
2271 peter_e 230 ECB :
2271 peter_e 231 GIC 72063 : switch (res)
232 : {
1478 andres 233 CBC 72063 : case TM_Ok:
2271 peter_e 234 GBC 72063 : break;
1478 andres 235 UIC 0 : case TM_Updated:
236 : /* XXX: Improve handling here */
1478 andres 237 LBC 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
1828 andres 238 UIC 0 : ereport(LOG,
1828 andres 239 ECB : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
240 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
241 : else
1828 andres 242 UIC 0 : ereport(LOG,
243 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
244 : errmsg("concurrent update, retrying")));
2271 peter_e 245 0 : goto retry;
1478 andres 246 LBC 0 : case TM_Deleted:
247 : /* XXX: Improve handling here */
1478 andres 248 UBC 0 : ereport(LOG,
1478 andres 249 EUB : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
250 : errmsg("concurrent delete, retrying")));
1478 andres 251 UIC 0 : goto retry;
252 0 : case TM_Invisible:
2271 peter_e 253 LBC 0 : elog(ERROR, "attempted to lock invisible tuple");
1804 tgl 254 ECB : break;
2271 peter_e 255 UIC 0 : default:
1417 andres 256 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
257 : break;
2271 peter_e 258 ECB : }
259 : }
260 :
2271 peter_e 261 GIC 72071 : index_endscan(scan);
262 :
2271 peter_e 263 ECB : /* Don't release lock until commit. */
2271 peter_e 264 GIC 72071 : index_close(idxrel, NoLock);
2271 peter_e 265 ECB :
2271 peter_e 266 GIC 72071 : return found;
267 : }
268 :
269 : /*
270 : * Compare the tuples in the slots by checking if they have equal values.
271 : */
272 : static bool
1093 noah 273 CBC 105276 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
274 : TypeCacheEntry **eq)
2271 peter_e 275 ECB : {
276 : int attrnum;
277 :
1490 andres 278 CBC 105276 : Assert(slot1->tts_tupleDescriptor->natts ==
1490 andres 279 EUB : slot2->tts_tupleDescriptor->natts);
280 :
1490 andres 281 GBC 105276 : slot_getallattrs(slot1);
282 105276 : slot_getallattrs(slot2);
283 :
284 : /* Check equality of the attributes. */
1490 andres 285 GIC 105455 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
2271 peter_e 286 EUB : {
287 : Form_pg_attribute att;
288 : TypeCacheEntry *typentry;
2116 289 :
19 akapila 290 GBC 105300 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
291 :
19 akapila 292 EUB : /*
293 : * Ignore dropped and generated columns as the publisher doesn't send
294 : * those
295 : */
17 akapila 296 GBC 105300 : if (att->attisdropped || att->attgenerated)
19 297 2 : continue;
298 :
2271 peter_e 299 EUB : /*
300 : * If one value is NULL and other is not, then they are certainly not
301 : * equal
302 : */
1490 andres 303 GIC 105298 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
2271 peter_e 304 UIC 0 : return false;
2271 peter_e 305 ECB :
306 : /*
307 : * If both are NULL, they can be considered equal.
308 : */
1490 andres 309 GIC 105298 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
2271 peter_e 310 CBC 1 : continue;
311 :
1093 noah 312 GIC 105297 : typentry = eq[attrnum];
313 105297 : if (typentry == NULL)
314 : {
315 176 : typentry = lookup_type_cache(att->atttypid,
316 : TYPECACHE_EQ_OPR_FINFO);
1093 noah 317 CBC 176 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1093 noah 318 UIC 0 : ereport(ERROR,
319 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
320 : errmsg("could not identify an equality operator for type %s",
321 : format_type_be(att->atttypid))));
1093 noah 322 CBC 176 : eq[attrnum] = typentry;
323 : }
324 :
1479 peter 325 105297 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
1479 peter 326 ECB : att->attcollation,
1418 tgl 327 GIC 105297 : slot1->tts_values[attrnum],
328 105297 : slot2->tts_values[attrnum])))
2271 peter_e 329 CBC 105121 : return false;
330 : }
331 :
2271 peter_e 332 GIC 155 : return true;
333 : }
2271 peter_e 334 ECB :
335 : /*
336 : * Search the relation 'rel' for tuple using the sequential scan.
337 : *
338 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
339 : * contents, and return true. Return false otherwise.
340 : *
341 : * Note that this stops on the first matching tuple.
342 : *
343 : * This can obviously be quite slow on tables that have more than few rows.
344 : */
345 : bool
2271 peter_e 346 GIC 144 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
2271 peter_e 347 ECB : TupleTableSlot *searchslot, TupleTableSlot *outslot)
2271 peter_e 348 EUB : {
349 : TupleTableSlot *scanslot;
350 : TableScanDesc scan;
351 : SnapshotData snap;
352 : TypeCacheEntry **eq;
2153 bruce 353 ECB : TransactionId xwait;
354 : bool found;
1490 andres 355 GIC 144 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
2271 peter_e 356 ECB :
2271 peter_e 357 CBC 144 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
358 :
1093 noah 359 144 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
360 :
1945 peter_e 361 ECB : /* Start a heap scan. */
2271 peter_e 362 GBC 144 : InitDirtySnapshot(snap);
1490 andres 363 GIC 144 : scan = table_beginscan(rel, &snap, 0, NULL);
364 144 : scanslot = table_slot_create(rel, NULL);
365 :
2271 peter_e 366 LBC 0 : retry:
2271 peter_e 367 GIC 144 : found = false;
368 :
1490 andres 369 CBC 144 : table_rescan(scan, NULL);
370 :
2271 peter_e 371 ECB : /* Try to find the tuple */
1490 andres 372 CBC 105265 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
2271 peter_e 373 ECB : {
1093 noah 374 GIC 105265 : if (!tuples_equal(scanslot, searchslot, eq))
2271 peter_e 375 105121 : continue;
2271 peter_e 376 ECB :
2271 peter_e 377 GIC 144 : found = true;
1490 andres 378 144 : ExecCopySlot(outslot, scanslot);
379 :
2271 peter_e 380 288 : xwait = TransactionIdIsValid(snap.xmin) ?
381 144 : snap.xmin : snap.xmax;
382 :
383 : /*
384 : * If the tuple is locked, wait for locking transaction to finish and
385 : * retry.
386 : */
387 144 : if (TransactionIdIsValid(xwait))
388 : {
2271 peter_e 389 UIC 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
2271 peter_e 390 LBC 0 : goto retry;
391 : }
392 :
393 : /* Found our tuple and it's not locked */
1161 alvherre 394 GIC 144 : break;
395 : }
396 :
397 : /* Found tuple, try to lock it in the lockmode. */
2271 peter_e 398 144 : if (found)
2271 peter_e 399 ECB : {
400 : TM_FailureData tmfd;
1478 andres 401 : TM_Result res;
402 :
2271 peter_e 403 CBC 144 : PushActiveSnapshot(GetLatestSnapshot());
404 :
1417 andres 405 GIC 144 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
1478 andres 406 ECB : outslot,
407 : GetCurrentCommandId(false),
408 : lockmode,
409 : LockWaitBlock,
1478 andres 410 EUB : 0 /* don't follow updates */ ,
1478 andres 411 ECB : &tmfd);
412 :
2271 peter_e 413 CBC 144 : PopActiveSnapshot();
414 :
2271 peter_e 415 GIC 144 : switch (res)
2271 peter_e 416 ECB : {
1478 andres 417 GIC 144 : case TM_Ok:
2271 peter_e 418 CBC 144 : break;
1478 andres 419 LBC 0 : case TM_Updated:
420 : /* XXX: Improve handling here */
421 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
1828 422 0 : ereport(LOG,
423 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
1828 andres 424 ECB : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
425 : else
1828 andres 426 UIC 0 : ereport(LOG,
427 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
428 : errmsg("concurrent update, retrying")));
2271 peter_e 429 0 : goto retry;
1478 andres 430 0 : case TM_Deleted:
1478 andres 431 ECB : /* XXX: Improve handling here */
1478 andres 432 UIC 0 : ereport(LOG,
1478 andres 433 EUB : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
434 : errmsg("concurrent delete, retrying")));
1478 andres 435 UIC 0 : goto retry;
436 0 : case TM_Invisible:
2271 peter_e 437 0 : elog(ERROR, "attempted to lock invisible tuple");
1804 tgl 438 ECB : break;
2271 peter_e 439 UIC 0 : default:
1417 andres 440 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
441 : break;
2271 peter_e 442 ECB : }
443 : }
444 :
1490 andres 445 GIC 144 : table_endscan(scan);
446 144 : ExecDropSingleTupleTableSlot(scanslot);
2271 peter_e 447 ECB :
2271 peter_e 448 GIC 144 : return found;
2271 peter_e 449 ECB : }
450 :
451 : /*
452 : * Insert tuple represented in the slot to the relation, update the indexes,
453 : * and execute any constraints and per-row triggers.
454 : *
455 : * Caller is responsible for opening the indexes.
456 : */
457 : void
907 heikki.linnakangas 458 GIC 75588 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
907 heikki.linnakangas 459 ECB : EState *estate, TupleTableSlot *slot)
460 : {
2153 bruce 461 CBC 75588 : bool skip_tuple = false;
462 75588 : Relation rel = resultRelInfo->ri_RelationDesc;
2271 peter_e 463 EUB :
464 : /* For now we support only tables. */
2271 peter_e 465 GBC 75588 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
2271 peter_e 466 EUB :
2271 peter_e 467 GIC 75588 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
468 :
469 : /* BEFORE ROW INSERT Triggers */
2271 peter_e 470 GBC 75588 : if (resultRelInfo->ri_TrigDesc &&
2271 peter_e 471 GIC 17 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
472 : {
1503 andres 473 GBC 1 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
1418 tgl 474 1 : skip_tuple = true; /* "do nothing" */
475 : }
2271 peter_e 476 EUB :
2271 peter_e 477 GIC 75588 : if (!skip_tuple)
478 : {
2271 peter_e 479 GBC 75587 : List *recheckIndexes = NIL;
2271 peter_e 480 EUB :
1471 peter 481 : /* Compute stored generated columns */
1471 peter 482 GIC 75587 : if (rel->rd_att->constr &&
1471 peter 483 GBC 45388 : rel->rd_att->constr->has_generated_stored)
907 heikki.linnakangas 484 UBC 0 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
485 : CMD_INSERT);
486 :
487 : /* Check the constraints of the tuple */
2271 peter_e 488 GIC 75587 : if (rel->rd_att->constr)
1763 alvherre 489 CBC 45388 : ExecConstraints(resultRelInfo, slot, estate);
935 tgl 490 75587 : if (rel->rd_rel->relispartition)
1763 alvherre 491 GIC 60 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
2271 peter_e 492 ECB :
493 : /* OK, store the tuple and create index entries for it */
1417 andres 494 GIC 75587 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
495 :
2271 peter_e 496 75587 : if (resultRelInfo->ri_NumIndices > 0)
907 heikki.linnakangas 497 55333 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
498 : slot, estate, false, false,
499 : NULL, NIL, false);
500 :
501 : /* AFTER ROW INSERT Triggers */
1503 andres 502 CBC 75578 : ExecARInsertTriggers(estate, resultRelInfo, slot,
503 : recheckIndexes, NULL);
504 :
2111 rhodiumtoad 505 ECB : /*
506 : * XXX we should in theory pass a TransitionCaptureState object to the
507 : * above to capture transition tuples, but after statement triggers
508 : * don't actually get fired by replication yet anyway
509 : */
510 :
2271 peter_e 511 CBC 75578 : list_free(recheckIndexes);
512 : }
2271 peter_e 513 GIC 75579 : }
2271 peter_e 514 ECB :
515 : /*
516 : * Find the searchslot tuple and update it with data in the slot,
517 : * update the indexes, and execute any constraints and per-row triggers.
518 : *
519 : * Caller is responsible for opening the indexes.
520 : */
521 : void
907 heikki.linnakangas 522 GIC 31907 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
907 heikki.linnakangas 523 ECB : EState *estate, EPQState *epqstate,
524 : TupleTableSlot *searchslot, TupleTableSlot *slot)
525 : {
2153 bruce 526 CBC 31907 : bool skip_tuple = false;
527 31907 : Relation rel = resultRelInfo->ri_RelationDesc;
1478 andres 528 GBC 31907 : ItemPointer tid = &(searchslot->tts_tid);
529 :
530 : /* For now we support only tables. */
2271 peter_e 531 GIC 31907 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
2271 peter_e 532 ECB :
2271 peter_e 533 CBC 31907 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
2271 peter_e 534 ECB :
2006 rhaas 535 : /* BEFORE ROW UPDATE Triggers */
2271 peter_e 536 GIC 31907 : if (resultRelInfo->ri_TrigDesc &&
537 9 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
2271 peter_e 538 ECB : {
1503 andres 539 GIC 2 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
540 : tid, NULL, slot, NULL, NULL))
1418 tgl 541 CBC 2 : skip_tuple = true; /* "do nothing" */
542 : }
543 :
2271 peter_e 544 GIC 31907 : if (!skip_tuple)
545 : {
2271 peter_e 546 CBC 31905 : List *recheckIndexes = NIL;
547 : TU_UpdateIndexes update_indexes;
548 :
549 : /* Compute stored generated columns */
1471 peter 550 GIC 31905 : if (rel->rd_att->constr &&
551 31866 : rel->rd_att->constr->has_generated_stored)
907 heikki.linnakangas 552 1 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
553 : CMD_UPDATE);
554 :
2271 peter_e 555 ECB : /* Check the constraints of the tuple */
2271 peter_e 556 GIC 31905 : if (rel->rd_att->constr)
1763 alvherre 557 CBC 31866 : ExecConstraints(resultRelInfo, slot, estate);
935 tgl 558 GIC 31905 : if (rel->rd_rel->relispartition)
1763 alvherre 559 11 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
560 :
1417 andres 561 31905 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
562 : &update_indexes);
563 :
20 tomas.vondra 564 GNC 31905 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
907 heikki.linnakangas 565 GIC 20185 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
816 pg 566 ECB : slot, estate, true, false,
567 : NULL, NIL,
568 : (update_indexes == TU_Summarizing));
569 :
570 : /* AFTER ROW UPDATE Triggers */
2271 peter_e 571 CBC 31905 : ExecARUpdateTriggers(estate, resultRelInfo,
385 alvherre 572 ECB : NULL, NULL,
1478 andres 573 : tid, NULL, slot,
574 : recheckIndexes, NULL, false);
575 :
2271 peter_e 576 CBC 31905 : list_free(recheckIndexes);
577 : }
578 31907 : }
579 :
580 : /*
2271 peter_e 581 ECB : * Find the searchslot tuple and delete it, and execute any constraints
582 : * and per-row triggers.
583 : *
584 : * Caller is responsible for opening the indexes.
585 : */
586 : void
907 heikki.linnakangas 587 GIC 40299 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
588 : EState *estate, EPQState *epqstate,
2271 peter_e 589 ECB : TupleTableSlot *searchslot)
590 : {
2153 bruce 591 CBC 40299 : bool skip_tuple = false;
2153 bruce 592 GIC 40299 : Relation rel = resultRelInfo->ri_RelationDesc;
1478 andres 593 40299 : ItemPointer tid = &searchslot->tts_tid;
594 :
2271 peter_e 595 CBC 40299 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
2271 peter_e 596 ECB :
2006 rhaas 597 : /* BEFORE ROW DELETE Triggers */
2271 peter_e 598 GIC 40299 : if (resultRelInfo->ri_TrigDesc &&
2005 rhaas 599 10 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
600 : {
2271 peter_e 601 LBC 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
27 dean.a.rasheed 602 UNC 0 : tid, NULL, NULL, NULL, NULL);
2271 peter_e 603 ECB : }
604 :
2271 peter_e 605 GIC 40299 : if (!skip_tuple)
2271 peter_e 606 ECB : {
607 : /* OK, delete the tuple */
1417 andres 608 GIC 40299 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
2271 peter_e 609 ECB :
610 : /* AFTER ROW DELETE Triggers */
2271 peter_e 611 GIC 40299 : ExecARDeleteTriggers(estate, resultRelInfo,
612 : tid, NULL, NULL, false);
613 : }
614 40299 : }
615 :
2271 peter_e 616 ECB : /*
617 : * Check if command can be executed with current replica identity.
618 : */
619 : void
2271 peter_e 620 GIC 218870 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
2271 peter_e 621 ECB : {
622 : PublicationDesc pubdesc;
623 :
624 : /*
625 : * Skip checking the replica identity for partitioned tables, because the
626 : * operations are actually performed on the leaf partitions.
627 : */
236 akapila 628 GIC 218870 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
629 207040 : return;
630 :
631 : /* We only need to do checks for UPDATE and DELETE. */
2271 peter_e 632 CBC 216101 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
2271 peter_e 633 GIC 129515 : return;
634 :
635 : /*
411 akapila 636 ECB : * It is only safe to execute UPDATE/DELETE when all columns, referenced
637 : * in the row filters from publications which the relation is in, are
638 : * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
639 : * or the table does not publish UPDATEs or DELETEs.
640 : *
641 : * XXX We could optimize it by first checking whether any of the
642 : * publications have a row filter for this relation. If not and relation
643 : * has replica identity then we can avoid building the descriptor but as
644 : * this happens only one time it doesn't seem worth the additional
645 : * complexity.
411 akapila 646 EUB : */
411 akapila 647 GBC 86586 : RelationBuildPublicationDesc(rel, &pubdesc);
411 akapila 648 GIC 86586 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
649 30 : ereport(ERROR,
411 akapila 650 ECB : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
651 : errmsg("cannot update table \"%s\"",
652 : RelationGetRelationName(rel)),
653 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
379 tomas.vondra 654 GIC 86556 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
655 54 : ereport(ERROR,
379 tomas.vondra 656 ECB : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
657 : errmsg("cannot update table \"%s\"",
658 : RelationGetRelationName(rel)),
659 : errdetail("Column list used by the publication does not cover the replica identity.")));
411 akapila 660 GIC 86502 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
411 akapila 661 UIC 0 : ereport(ERROR,
662 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
663 : errmsg("cannot delete from table \"%s\"",
664 : RelationGetRelationName(rel)),
411 akapila 665 ECB : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
379 tomas.vondra 666 GIC 86502 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
379 tomas.vondra 667 UIC 0 : ereport(ERROR,
668 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
669 : errmsg("cannot delete from table \"%s\"",
670 : RelationGetRelationName(rel)),
671 : errdetail("Column list used by the publication does not cover the replica identity.")));
672 :
2271 peter_e 673 ECB : /* If relation has replica identity we are always good. */
411 akapila 674 CBC 86502 : if (OidIsValid(RelationGetReplicaIndex(rel)))
2271 peter_e 675 GIC 74556 : return;
676 :
379 tomas.vondra 677 ECB : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
379 tomas.vondra 678 CBC 11946 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
379 tomas.vondra 679 GIC 200 : return;
680 :
681 : /*
682 : * This is UPDATE/DELETE and there is no replica identity.
683 : *
684 : * Check if the table publishes UPDATES or DELETES.
685 : */
411 akapila 686 11746 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
2271 peter_e 687 45 : ereport(ERROR,
688 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
689 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
690 : RelationGetRelationName(rel)),
691 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
411 akapila 692 CBC 11701 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
2271 peter_e 693 LBC 0 : ereport(ERROR,
2271 peter_e 694 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
695 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
696 : RelationGetRelationName(rel)),
697 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
698 : }
2154 699 :
700 :
701 : /*
702 : * Check if we support writing into specific relkind.
703 : *
704 : * The nspname and relname are only needed for error reporting.
705 : */
2154 peter_e 706 EUB : void
2154 peter_e 707 GIC 741 : CheckSubscriptionRelkind(char relkind, const char *nspname,
708 : const char *relname)
709 : {
367 tomas.vondra 710 741 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
2154 peter_e 711 LBC 0 : ereport(ERROR,
2154 peter_e 712 EUB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
713 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
714 : nspname, relname),
715 : errdetail_relkind_not_supported(relkind)));
2154 peter_e 716 GIC 741 : }
|