LCOV - differential code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 81.9 % 227 186 2 12 25 2 15 97 33 41 24 120 13
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 9 9 9 7 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * execReplication.c
       4                 :  *    miscellaneous executor routines for logical replication
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *    src/backend/executor/execReplication.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "access/genam.h"
      18                 : #include "access/relscan.h"
      19                 : #include "access/tableam.h"
      20                 : #include "access/transam.h"
      21                 : #include "access/xact.h"
      22                 : #include "commands/trigger.h"
      23                 : #include "executor/executor.h"
      24                 : #include "executor/nodeModifyTable.h"
      25                 : #include "nodes/nodeFuncs.h"
      26                 : #include "parser/parse_relation.h"
      27                 : #include "parser/parsetree.h"
      28                 : #include "replication/logicalrelation.h"
      29                 : #include "storage/bufmgr.h"
      30                 : #include "storage/lmgr.h"
      31                 : #include "utils/builtins.h"
      32                 : #include "utils/datum.h"
      33                 : #include "utils/lsyscache.h"
      34                 : #include "utils/memutils.h"
      35                 : #include "utils/rel.h"
      36                 : #include "utils/snapmgr.h"
      37                 : #include "utils/syscache.h"
      38                 : #include "utils/typcache.h"
      39                 : 
      40                 : 
      41                 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      42                 :                          TypeCacheEntry **eq);
      43                 : 
      44                 : /*
      45                 :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      46                 :  * is setup to match 'rel' (*NOT* idxrel!).
      47                 :  *
      48                 :  * Returns how many columns to use for the index scan.
      49                 :  *
      50                 :  * This is not generic routine, it expects the idxrel to be a btree, non-partial
      51                 :  * and have at least one column reference (i.e. cannot consist of only
      52                 :  * expressions).
      53                 :  *
      54                 :  * By definition, replication identity of a rel meets all limitations associated
      55                 :  * with that. Note that any other index could also meet these limitations.
      56                 :  */
      57                 : static int
      58 GIC       72071 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      59                 :                          TupleTableSlot *searchslot)
      60                 : {
      61                 :     int         index_attoff;
      62 GNC       72071 :     int         skey_attoff = 0;
      63                 :     Datum       indclassDatum;
      64                 :     oidvector  *opclass;
      65 GIC       72071 :     int2vector *indkey = &idxrel->rd_index->indkey;
      66                 : 
      67 GNC       72071 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      68                 :                                            Anum_pg_index_indclass);
      69 GIC       72071 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      70 ECB             : 
      71                 :     /* Build scankey for every non-expression attribute in the index. */
      72 GNC      144155 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      73           72084 :          index_attoff++)
      74                 :     {
      75                 :         Oid         operator;
      76                 :         Oid         optype;
      77 ECB             :         Oid         opfamily;
      78                 :         RegProcedure regop;
      79 GNC       72084 :         int         table_attno = indkey->values[index_attoff];
      80                 : 
      81           72084 :         if (!AttributeNumberIsValid(table_attno))
      82                 :         {
      83                 :             /*
      84                 :              * XXX: Currently, we don't support expressions in the scan key,
      85                 :              * see code below.
      86                 :              */
      87               2 :             continue;
      88                 :         }
      89                 : 
      90                 :         /*
      91 ECB             :          * Load the operator info.  We need this to get the equality operator
      92                 :          * function for the scan key.
      93                 :          */
      94 GNC       72082 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      95           72082 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      96                 : 
      97 GIC       72082 :         operator = get_opfamily_member(opfamily, optype,
      98                 :                                        optype,
      99                 :                                        BTEqualStrategyNumber);
     100 CBC       72082 :         if (!OidIsValid(operator))
     101 UIC           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     102                 :                  BTEqualStrategyNumber, optype, optype, opfamily);
     103                 : 
     104 GIC       72082 :         regop = get_opcode(operator);
     105                 : 
     106                 :         /* Initialize the scankey. */
     107 GNC       72082 :         ScanKeyInit(&skey[skey_attoff],
     108           72082 :                     index_attoff + 1,
     109                 :                     BTEqualStrategyNumber,
     110 ECB             :                     regop,
     111 GNC       72082 :                     searchslot->tts_values[table_attno - 1]);
     112                 : 
     113           72082 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     114 EUB             : 
     115                 :         /* Check for null value. */
     116 GNC       72082 :         if (searchslot->tts_isnull[table_attno - 1])
     117               1 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     118                 : 
     119           72082 :         skey_attoff++;
     120 ECB             :     }
     121                 : 
     122                 :     /* There must always be at least one attribute for the index scan. */
     123 GNC       72071 :     Assert(skey_attoff > 0);
     124                 : 
     125           72071 :     return skey_attoff;
     126 ECB             : }
     127                 : 
     128                 : /*
     129                 :  * Search the relation 'rel' for tuple using the index.
     130                 :  *
     131                 :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     132                 :  * contents, and return true.  Return false otherwise.
     133                 :  */
     134                 : bool
     135 GIC       72071 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     136                 :                              LockTupleMode lockmode,
     137                 :                              TupleTableSlot *searchslot,
     138 ECB             :                              TupleTableSlot *outslot)
     139                 : {
     140                 :     ScanKeyData skey[INDEX_MAX_KEYS];
     141                 :     int         skey_attoff;
     142                 :     IndexScanDesc scan;
     143                 :     SnapshotData snap;
     144                 :     TransactionId xwait;
     145                 :     Relation    idxrel;
     146                 :     bool        found;
     147 GNC       72071 :     TypeCacheEntry **eq = NULL;
     148                 :     bool        isIdxSafeToSkipDuplicates;
     149                 : 
     150                 :     /* Open the index. */
     151 GIC       72071 :     idxrel = index_open(idxoid, RowExclusiveLock);
     152                 : 
     153 GNC       72071 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     154                 : 
     155 GIC       72071 :     InitDirtySnapshot(snap);
     156                 : 
     157                 :     /* Build scan key. */
     158 GNC       72071 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     159                 : 
     160                 :     /* Start an index scan. */
     161           72071 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
     162                 : 
     163 UIC           0 : retry:
     164 GIC       72071 :     found = false;
     165                 : 
     166 GNC       72071 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     167                 : 
     168                 :     /* Try to find the tuple */
     169           72071 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     170 ECB             :     {
     171                 :         /*
     172                 :          * Avoid expensive equality check if the index is primary key or
     173                 :          * replica identity index.
     174                 :          */
     175 GNC       72063 :         if (!isIdxSafeToSkipDuplicates)
     176                 :         {
     177              11 :             if (eq == NULL)
     178                 :             {
     179                 : #ifdef USE_ASSERT_CHECKING
     180                 :                 /* apply assertions only once for the input idxoid */
     181              11 :                 IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
     182                 : 
     183              11 :                 Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
     184                 : #endif
     185                 : 
     186              11 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     187                 :             }
     188                 : 
     189              11 :             if (!tuples_equal(outslot, searchslot, eq))
     190 UNC           0 :                 continue;
     191                 :         }
     192                 : 
     193 CBC       72063 :         ExecMaterializeSlot(outslot);
     194                 : 
     195          144126 :         xwait = TransactionIdIsValid(snap.xmin) ?
     196 GIC       72063 :             snap.xmin : snap.xmax;
     197                 : 
     198 ECB             :         /*
     199                 :          * If the tuple is locked, wait for locking transaction to finish and
     200                 :          * retry.
     201                 :          */
     202 GIC       72063 :         if (TransactionIdIsValid(xwait))
     203 EUB             :         {
     204 LBC           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     205 UIC           0 :             goto retry;
     206 ECB             :         }
     207                 : 
     208                 :         /* Found our tuple and it's not locked */
     209 GNC       72063 :         found = true;
     210           72063 :         break;
     211                 :     }
     212                 : 
     213 ECB             :     /* Found tuple, try to lock it in the lockmode. */
     214 GIC       72071 :     if (found)
     215                 :     {
     216                 :         TM_FailureData tmfd;
     217                 :         TM_Result   res;
     218                 : 
     219 CBC       72063 :         PushActiveSnapshot(GetLatestSnapshot());
     220                 : 
     221           72063 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     222                 :                                outslot,
     223                 :                                GetCurrentCommandId(false),
     224                 :                                lockmode,
     225 ECB             :                                LockWaitBlock,
     226                 :                                0 /* don't follow updates */ ,
     227                 :                                &tmfd);
     228                 : 
     229 GIC       72063 :         PopActiveSnapshot();
     230 ECB             : 
     231 GIC       72063 :         switch (res)
     232                 :         {
     233 CBC       72063 :             case TM_Ok:
     234 GBC       72063 :                 break;
     235 UIC           0 :             case TM_Updated:
     236                 :                 /* XXX: Improve handling here */
     237 LBC           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     238 UIC           0 :                     ereport(LOG,
     239 ECB             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     240                 :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     241                 :                 else
     242 UIC           0 :                     ereport(LOG,
     243                 :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     244                 :                              errmsg("concurrent update, retrying")));
     245               0 :                 goto retry;
     246 LBC           0 :             case TM_Deleted:
     247                 :                 /* XXX: Improve handling here */
     248 UBC           0 :                 ereport(LOG,
     249 EUB             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     250                 :                          errmsg("concurrent delete, retrying")));
     251 UIC           0 :                 goto retry;
     252               0 :             case TM_Invisible:
     253 LBC           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     254 ECB             :                 break;
     255 UIC           0 :             default:
     256               0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     257                 :                 break;
     258 ECB             :         }
     259                 :     }
     260                 : 
     261 GIC       72071 :     index_endscan(scan);
     262                 : 
     263 ECB             :     /* Don't release lock until commit. */
     264 GIC       72071 :     index_close(idxrel, NoLock);
     265 ECB             : 
     266 GIC       72071 :     return found;
     267                 : }
     268                 : 
     269                 : /*
     270                 :  * Compare the tuples in the slots by checking if they have equal values.
     271                 :  */
     272                 : static bool
     273 CBC      105276 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     274                 :              TypeCacheEntry **eq)
     275 ECB             : {
     276                 :     int         attrnum;
     277                 : 
     278 CBC      105276 :     Assert(slot1->tts_tupleDescriptor->natts ==
     279 EUB             :            slot2->tts_tupleDescriptor->natts);
     280                 : 
     281 GBC      105276 :     slot_getallattrs(slot1);
     282          105276 :     slot_getallattrs(slot2);
     283                 : 
     284                 :     /* Check equality of the attributes. */
     285 GIC      105455 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     286 EUB             :     {
     287                 :         Form_pg_attribute att;
     288                 :         TypeCacheEntry *typentry;
     289                 : 
     290 GBC      105300 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     291                 : 
     292 EUB             :         /*
     293                 :          * Ignore dropped and generated columns as the publisher doesn't send
     294                 :          * those
     295                 :          */
     296 GBC      105300 :         if (att->attisdropped || att->attgenerated)
     297               2 :             continue;
     298                 : 
     299 EUB             :         /*
     300                 :          * If one value is NULL and other is not, then they are certainly not
     301                 :          * equal
     302                 :          */
     303 GIC      105298 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     304 UIC           0 :             return false;
     305 ECB             : 
     306                 :         /*
     307                 :          * If both are NULL, they can be considered equal.
     308                 :          */
     309 GIC      105298 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     310 CBC           1 :             continue;
     311                 : 
     312 GIC      105297 :         typentry = eq[attrnum];
     313          105297 :         if (typentry == NULL)
     314                 :         {
     315             176 :             typentry = lookup_type_cache(att->atttypid,
     316                 :                                          TYPECACHE_EQ_OPR_FINFO);
     317 CBC         176 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     318 UIC           0 :                 ereport(ERROR,
     319                 :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     320                 :                          errmsg("could not identify an equality operator for type %s",
     321                 :                                 format_type_be(att->atttypid))));
     322 CBC         176 :             eq[attrnum] = typentry;
     323                 :         }
     324                 : 
     325          105297 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     326 ECB             :                                             att->attcollation,
     327 GIC      105297 :                                             slot1->tts_values[attrnum],
     328          105297 :                                             slot2->tts_values[attrnum])))
     329 CBC      105121 :             return false;
     330                 :     }
     331                 : 
     332 GIC         155 :     return true;
     333                 : }
     334 ECB             : 
     335                 : /*
     336                 :  * Search the relation 'rel' for tuple using the sequential scan.
     337                 :  *
     338                 :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     339                 :  * contents, and return true.  Return false otherwise.
     340                 :  *
     341                 :  * Note that this stops on the first matching tuple.
     342                 :  *
     343                 :  * This can obviously be quite slow on tables that have more than few rows.
     344                 :  */
     345                 : bool
     346 GIC         144 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     347 ECB             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     348 EUB             : {
     349                 :     TupleTableSlot *scanslot;
     350                 :     TableScanDesc scan;
     351                 :     SnapshotData snap;
     352                 :     TypeCacheEntry **eq;
     353 ECB             :     TransactionId xwait;
     354                 :     bool        found;
     355 GIC         144 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     356 ECB             : 
     357 CBC         144 :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     358                 : 
     359             144 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     360                 : 
     361 ECB             :     /* Start a heap scan. */
     362 GBC         144 :     InitDirtySnapshot(snap);
     363 GIC         144 :     scan = table_beginscan(rel, &snap, 0, NULL);
     364             144 :     scanslot = table_slot_create(rel, NULL);
     365                 : 
     366 LBC           0 : retry:
     367 GIC         144 :     found = false;
     368                 : 
     369 CBC         144 :     table_rescan(scan, NULL);
     370                 : 
     371 ECB             :     /* Try to find the tuple */
     372 CBC      105265 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     373 ECB             :     {
     374 GIC      105265 :         if (!tuples_equal(scanslot, searchslot, eq))
     375          105121 :             continue;
     376 ECB             : 
     377 GIC         144 :         found = true;
     378             144 :         ExecCopySlot(outslot, scanslot);
     379                 : 
     380             288 :         xwait = TransactionIdIsValid(snap.xmin) ?
     381             144 :             snap.xmin : snap.xmax;
     382                 : 
     383                 :         /*
     384                 :          * If the tuple is locked, wait for locking transaction to finish and
     385                 :          * retry.
     386                 :          */
     387             144 :         if (TransactionIdIsValid(xwait))
     388                 :         {
     389 UIC           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     390 LBC           0 :             goto retry;
     391                 :         }
     392                 : 
     393                 :         /* Found our tuple and it's not locked */
     394 GIC         144 :         break;
     395                 :     }
     396                 : 
     397                 :     /* Found tuple, try to lock it in the lockmode. */
     398             144 :     if (found)
     399 ECB             :     {
     400                 :         TM_FailureData tmfd;
     401                 :         TM_Result   res;
     402                 : 
     403 CBC         144 :         PushActiveSnapshot(GetLatestSnapshot());
     404                 : 
     405 GIC         144 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     406 ECB             :                                outslot,
     407                 :                                GetCurrentCommandId(false),
     408                 :                                lockmode,
     409                 :                                LockWaitBlock,
     410 EUB             :                                0 /* don't follow updates */ ,
     411 ECB             :                                &tmfd);
     412                 : 
     413 CBC         144 :         PopActiveSnapshot();
     414                 : 
     415 GIC         144 :         switch (res)
     416 ECB             :         {
     417 GIC         144 :             case TM_Ok:
     418 CBC         144 :                 break;
     419 LBC           0 :             case TM_Updated:
     420                 :                 /* XXX: Improve handling here */
     421               0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     422               0 :                     ereport(LOG,
     423                 :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     424 ECB             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     425                 :                 else
     426 UIC           0 :                     ereport(LOG,
     427                 :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     428                 :                              errmsg("concurrent update, retrying")));
     429               0 :                 goto retry;
     430               0 :             case TM_Deleted:
     431 ECB             :                 /* XXX: Improve handling here */
     432 UIC           0 :                 ereport(LOG,
     433 EUB             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     434                 :                          errmsg("concurrent delete, retrying")));
     435 UIC           0 :                 goto retry;
     436               0 :             case TM_Invisible:
     437               0 :                 elog(ERROR, "attempted to lock invisible tuple");
     438 ECB             :                 break;
     439 UIC           0 :             default:
     440               0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     441                 :                 break;
     442 ECB             :         }
     443                 :     }
     444                 : 
     445 GIC         144 :     table_endscan(scan);
     446             144 :     ExecDropSingleTupleTableSlot(scanslot);
     447 ECB             : 
     448 GIC         144 :     return found;
     449 ECB             : }
     450                 : 
     451                 : /*
     452                 :  * Insert tuple represented in the slot to the relation, update the indexes,
     453                 :  * and execute any constraints and per-row triggers.
     454                 :  *
     455                 :  * Caller is responsible for opening the indexes.
     456                 :  */
     457                 : void
     458 GIC       75588 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     459 ECB             :                          EState *estate, TupleTableSlot *slot)
     460                 : {
     461 CBC       75588 :     bool        skip_tuple = false;
     462           75588 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     463 EUB             : 
     464                 :     /* For now we support only tables. */
     465 GBC       75588 :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     466 EUB             : 
     467 GIC       75588 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     468                 : 
     469                 :     /* BEFORE ROW INSERT Triggers */
     470 GBC       75588 :     if (resultRelInfo->ri_TrigDesc &&
     471 GIC          17 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     472                 :     {
     473 GBC           1 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     474               1 :             skip_tuple = true;  /* "do nothing" */
     475                 :     }
     476 EUB             : 
     477 GIC       75588 :     if (!skip_tuple)
     478                 :     {
     479 GBC       75587 :         List       *recheckIndexes = NIL;
     480 EUB             : 
     481                 :         /* Compute stored generated columns */
     482 GIC       75587 :         if (rel->rd_att->constr &&
     483 GBC       45388 :             rel->rd_att->constr->has_generated_stored)
     484 UBC           0 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     485                 :                                        CMD_INSERT);
     486                 : 
     487                 :         /* Check the constraints of the tuple */
     488 GIC       75587 :         if (rel->rd_att->constr)
     489 CBC       45388 :             ExecConstraints(resultRelInfo, slot, estate);
     490           75587 :         if (rel->rd_rel->relispartition)
     491 GIC          60 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     492 ECB             : 
     493                 :         /* OK, store the tuple and create index entries for it */
     494 GIC       75587 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     495                 : 
     496           75587 :         if (resultRelInfo->ri_NumIndices > 0)
     497           55333 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     498                 :                                                    slot, estate, false, false,
     499                 :                                                    NULL, NIL, false);
     500                 : 
     501                 :         /* AFTER ROW INSERT Triggers */
     502 CBC       75578 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     503                 :                              recheckIndexes, NULL);
     504                 : 
     505 ECB             :         /*
     506                 :          * XXX we should in theory pass a TransitionCaptureState object to the
     507                 :          * above to capture transition tuples, but after statement triggers
     508                 :          * don't actually get fired by replication yet anyway
     509                 :          */
     510                 : 
     511 CBC       75578 :         list_free(recheckIndexes);
     512                 :     }
     513 GIC       75579 : }
     514 ECB             : 
     515                 : /*
     516                 :  * Find the searchslot tuple and update it with data in the slot,
     517                 :  * update the indexes, and execute any constraints and per-row triggers.
     518                 :  *
     519                 :  * Caller is responsible for opening the indexes.
     520                 :  */
     521                 : void
     522 GIC       31907 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     523 ECB             :                          EState *estate, EPQState *epqstate,
     524                 :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     525                 : {
     526 CBC       31907 :     bool        skip_tuple = false;
     527           31907 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     528 GBC       31907 :     ItemPointer tid = &(searchslot->tts_tid);
     529                 : 
     530                 :     /* For now we support only tables. */
     531 GIC       31907 :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     532 ECB             : 
     533 CBC       31907 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     534 ECB             : 
     535                 :     /* BEFORE ROW UPDATE Triggers */
     536 GIC       31907 :     if (resultRelInfo->ri_TrigDesc &&
     537               9 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     538 ECB             :     {
     539 GIC           2 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     540                 :                                   tid, NULL, slot, NULL, NULL))
     541 CBC           2 :             skip_tuple = true;  /* "do nothing" */
     542                 :     }
     543                 : 
     544 GIC       31907 :     if (!skip_tuple)
     545                 :     {
     546 CBC       31905 :         List       *recheckIndexes = NIL;
     547                 :         TU_UpdateIndexes update_indexes;
     548                 : 
     549                 :         /* Compute stored generated columns */
     550 GIC       31905 :         if (rel->rd_att->constr &&
     551           31866 :             rel->rd_att->constr->has_generated_stored)
     552               1 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     553                 :                                        CMD_UPDATE);
     554                 : 
     555 ECB             :         /* Check the constraints of the tuple */
     556 GIC       31905 :         if (rel->rd_att->constr)
     557 CBC       31866 :             ExecConstraints(resultRelInfo, slot, estate);
     558 GIC       31905 :         if (rel->rd_rel->relispartition)
     559              11 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     560                 : 
     561           31905 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     562                 :                                   &update_indexes);
     563                 : 
     564 GNC       31905 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     565 GIC       20185 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     566 ECB             :                                                    slot, estate, true, false,
     567                 :                                                    NULL, NIL,
     568                 :                                                    (update_indexes == TU_Summarizing));
     569                 : 
     570                 :         /* AFTER ROW UPDATE Triggers */
     571 CBC       31905 :         ExecARUpdateTriggers(estate, resultRelInfo,
     572 ECB             :                              NULL, NULL,
     573                 :                              tid, NULL, slot,
     574                 :                              recheckIndexes, NULL, false);
     575                 : 
     576 CBC       31905 :         list_free(recheckIndexes);
     577                 :     }
     578           31907 : }
     579                 : 
     580                 : /*
     581 ECB             :  * Find the searchslot tuple and delete it, and execute any constraints
     582                 :  * and per-row triggers.
     583                 :  *
     584                 :  * Caller is responsible for opening the indexes.
     585                 :  */
     586                 : void
     587 GIC       40299 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     588                 :                          EState *estate, EPQState *epqstate,
     589 ECB             :                          TupleTableSlot *searchslot)
     590                 : {
     591 CBC       40299 :     bool        skip_tuple = false;
     592 GIC       40299 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     593           40299 :     ItemPointer tid = &searchslot->tts_tid;
     594                 : 
     595 CBC       40299 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     596 ECB             : 
     597                 :     /* BEFORE ROW DELETE Triggers */
     598 GIC       40299 :     if (resultRelInfo->ri_TrigDesc &&
     599              10 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     600                 :     {
     601 LBC           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     602 UNC           0 :                                            tid, NULL, NULL, NULL, NULL);
     603 ECB             :     }
     604                 : 
     605 GIC       40299 :     if (!skip_tuple)
     606 ECB             :     {
     607                 :         /* OK, delete the tuple */
     608 GIC       40299 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     609 ECB             : 
     610                 :         /* AFTER ROW DELETE Triggers */
     611 GIC       40299 :         ExecARDeleteTriggers(estate, resultRelInfo,
     612                 :                              tid, NULL, NULL, false);
     613                 :     }
     614           40299 : }
     615                 : 
     616 ECB             : /*
     617                 :  * Check if command can be executed with current replica identity.
     618                 :  */
     619                 : void
     620 GIC      218870 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     621 ECB             : {
     622                 :     PublicationDesc pubdesc;
     623                 : 
     624                 :     /*
     625                 :      * Skip checking the replica identity for partitioned tables, because the
     626                 :      * operations are actually performed on the leaf partitions.
     627                 :      */
     628 GIC      218870 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     629          207040 :         return;
     630                 : 
     631                 :     /* We only need to do checks for UPDATE and DELETE. */
     632 CBC      216101 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     633 GIC      129515 :         return;
     634                 : 
     635                 :     /*
     636 ECB             :      * It is only safe to execute UPDATE/DELETE when all columns, referenced
     637                 :      * in the row filters from publications which the relation is in, are
     638                 :      * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
     639                 :      * or the table does not publish UPDATEs or DELETEs.
     640                 :      *
     641                 :      * XXX We could optimize it by first checking whether any of the
     642                 :      * publications have a row filter for this relation. If not and relation
     643                 :      * has replica identity then we can avoid building the descriptor but as
     644                 :      * this happens only one time it doesn't seem worth the additional
     645                 :      * complexity.
     646 EUB             :      */
     647 GBC       86586 :     RelationBuildPublicationDesc(rel, &pubdesc);
     648 GIC       86586 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     649              30 :         ereport(ERROR,
     650 ECB             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     651                 :                  errmsg("cannot update table \"%s\"",
     652                 :                         RelationGetRelationName(rel)),
     653                 :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     654 GIC       86556 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     655              54 :         ereport(ERROR,
     656 ECB             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     657                 :                  errmsg("cannot update table \"%s\"",
     658                 :                         RelationGetRelationName(rel)),
     659                 :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     660 GIC       86502 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     661 UIC           0 :         ereport(ERROR,
     662                 :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     663                 :                  errmsg("cannot delete from table \"%s\"",
     664                 :                         RelationGetRelationName(rel)),
     665 ECB             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     666 GIC       86502 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     667 UIC           0 :         ereport(ERROR,
     668                 :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     669                 :                  errmsg("cannot delete from table \"%s\"",
     670                 :                         RelationGetRelationName(rel)),
     671                 :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     672                 : 
     673 ECB             :     /* If relation has replica identity we are always good. */
     674 CBC       86502 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     675 GIC       74556 :         return;
     676                 : 
     677 ECB             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     678 CBC       11946 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     679 GIC         200 :         return;
     680                 : 
     681                 :     /*
     682                 :      * This is UPDATE/DELETE and there is no replica identity.
     683                 :      *
     684                 :      * Check if the table publishes UPDATES or DELETES.
     685                 :      */
     686           11746 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     687              45 :         ereport(ERROR,
     688                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     689                 :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     690                 :                         RelationGetRelationName(rel)),
     691                 :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     692 CBC       11701 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     693 LBC           0 :         ereport(ERROR,
     694 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     695                 :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     696                 :                         RelationGetRelationName(rel)),
     697                 :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     698                 : }
     699                 : 
     700                 : 
     701                 : /*
     702                 :  * Check if we support writing into specific relkind.
     703                 :  *
     704                 :  * The nspname and relname are only needed for error reporting.
     705                 :  */
     706 EUB             : void
     707 GIC         741 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     708                 :                          const char *relname)
     709                 : {
     710             741 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     711 LBC           0 :         ereport(ERROR,
     712 EUB             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     713                 :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     714                 :                         nspname, relname),
     715                 :                  errdetail_relkind_not_supported(relkind)));
     716 GIC         741 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a