LCOV - differential code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 81.9 % 227 186 2 12 25 2 15 97 33 41 24 120 13
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 9 9 9 7 2
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 94.7 % 38 36 2 3 33 2
Legend: Lines: hit not hit (180,240] days: 100.0 % 2 2 2
(240..) days: 79.1 % 187 148 12 25 2 12 95 41 20 110
Function coverage date bins:
(240..) days: 60.0 % 15 9 9 6

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * execReplication.c
                                  4                 :  *    miscellaneous executor routines for logical replication
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  * IDENTIFICATION
                                 10                 :  *    src/backend/executor/execReplication.c
                                 11                 :  *
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include "access/genam.h"
                                 18                 : #include "access/relscan.h"
                                 19                 : #include "access/tableam.h"
                                 20                 : #include "access/transam.h"
                                 21                 : #include "access/xact.h"
                                 22                 : #include "commands/trigger.h"
                                 23                 : #include "executor/executor.h"
                                 24                 : #include "executor/nodeModifyTable.h"
                                 25                 : #include "nodes/nodeFuncs.h"
                                 26                 : #include "parser/parse_relation.h"
                                 27                 : #include "parser/parsetree.h"
                                 28                 : #include "replication/logicalrelation.h"
                                 29                 : #include "storage/bufmgr.h"
                                 30                 : #include "storage/lmgr.h"
                                 31                 : #include "utils/builtins.h"
                                 32                 : #include "utils/datum.h"
                                 33                 : #include "utils/lsyscache.h"
                                 34                 : #include "utils/memutils.h"
                                 35                 : #include "utils/rel.h"
                                 36                 : #include "utils/snapmgr.h"
                                 37                 : #include "utils/syscache.h"
                                 38                 : #include "utils/typcache.h"
                                 39                 : 
                                 40                 : 
                                 41                 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
                                 42                 :                          TypeCacheEntry **eq);
                                 43                 : 
                                 44                 : /*
                                 45                 :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
                                 46                 :  * is setup to match 'rel' (*NOT* idxrel!).
                                 47                 :  *
                                 48                 :  * Returns how many columns to use for the index scan.
                                 49                 :  *
                                 50                 :  * This is not generic routine, it expects the idxrel to be a btree, non-partial
                                 51                 :  * and have at least one column reference (i.e. cannot consist of only
                                 52                 :  * expressions).
                                 53                 :  *
                                 54                 :  * By definition, replication identity of a rel meets all limitations associated
                                 55                 :  * with that. Note that any other index could also meet these limitations.
                                 56                 :  */
                                 57                 : static int
 2271 peter_e                    58 GIC       72071 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
                                 59                 :                          TupleTableSlot *searchslot)
                                 60                 : {
                                 61                 :     int         index_attoff;
   25 akapila                    62 GNC       72071 :     int         skey_attoff = 0;
                                 63                 :     Datum       indclassDatum;
                                 64                 :     oidvector  *opclass;
 2271 peter_e                    65 GIC       72071 :     int2vector *indkey = &idxrel->rd_index->indkey;
                                 66                 : 
   15 dgustafsson                67 GNC       72071 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
                                 68                 :                                            Anum_pg_index_indclass);
 2271 peter_e                    69 GIC       72071 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
 2271 peter_e                    70 ECB             : 
                                 71                 :     /* Build scankey for every non-expression attribute in the index. */
   25 akapila                    72 GNC      144155 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
                                 73           72084 :          index_attoff++)
                                 74                 :     {
                                 75                 :         Oid         operator;
                                 76                 :         Oid         optype;
 2271 peter_e                    77 ECB             :         Oid         opfamily;
                                 78                 :         RegProcedure regop;
   25 akapila                    79 GNC       72084 :         int         table_attno = indkey->values[index_attoff];
                                 80                 : 
                                 81           72084 :         if (!AttributeNumberIsValid(table_attno))
                                 82                 :         {
                                 83                 :             /*
                                 84                 :              * XXX: Currently, we don't support expressions in the scan key,
                                 85                 :              * see code below.
                                 86                 :              */
                                 87               2 :             continue;
                                 88                 :         }
                                 89                 : 
                                 90                 :         /*
 2271 peter_e                    91 ECB             :          * Load the operator info.  We need this to get the equality operator
                                 92                 :          * function for the scan key.
                                 93                 :          */
   25 akapila                    94 GNC       72082 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
                                 95           72082 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
                                 96                 : 
 2271 peter_e                    97 GIC       72082 :         operator = get_opfamily_member(opfamily, optype,
                                 98                 :                                        optype,
                                 99                 :                                        BTEqualStrategyNumber);
 2271 peter_e                   100 CBC       72082 :         if (!OidIsValid(operator))
 2085 tgl                       101 UIC           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
                                102                 :                  BTEqualStrategyNumber, optype, optype, opfamily);
                                103                 : 
 2271 peter_e                   104 GIC       72082 :         regop = get_opcode(operator);
                                105                 : 
                                106                 :         /* Initialize the scankey. */
   25 akapila                   107 GNC       72082 :         ScanKeyInit(&skey[skey_attoff],
                                108           72082 :                     index_attoff + 1,
                                109                 :                     BTEqualStrategyNumber,
 2271 peter_e                   110 ECB             :                     regop,
   25 akapila                   111 GNC       72082 :                     searchslot->tts_values[table_attno - 1]);
                                112                 : 
                                113           72082 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 1479 peter                     114 EUB             : 
                                115                 :         /* Check for null value. */
   25 akapila                   116 GNC       72082 :         if (searchslot->tts_isnull[table_attno - 1])
                                117               1 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
                                118                 : 
                                119           72082 :         skey_attoff++;
 2271 peter_e                   120 ECB             :     }
                                121                 : 
                                122                 :     /* There must always be at least one attribute for the index scan. */
   25 akapila                   123 GNC       72071 :     Assert(skey_attoff > 0);
                                124                 : 
                                125           72071 :     return skey_attoff;
 2271 peter_e                   126 ECB             : }
                                127                 : 
                                128                 : /*
                                129                 :  * Search the relation 'rel' for tuple using the index.
                                130                 :  *
                                131                 :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
                                132                 :  * contents, and return true.  Return false otherwise.
                                133                 :  */
                                134                 : bool
 2271 peter_e                   135 GIC       72071 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
                                136                 :                              LockTupleMode lockmode,
                                137                 :                              TupleTableSlot *searchslot,
 2271 peter_e                   138 ECB             :                              TupleTableSlot *outslot)
                                139                 : {
 2153 bruce                     140                 :     ScanKeyData skey[INDEX_MAX_KEYS];
                                141                 :     int         skey_attoff;
                                142                 :     IndexScanDesc scan;
                                143                 :     SnapshotData snap;
                                144                 :     TransactionId xwait;
                                145                 :     Relation    idxrel;
                                146                 :     bool        found;
   25 akapila                   147 GNC       72071 :     TypeCacheEntry **eq = NULL;
                                148                 :     bool        isIdxSafeToSkipDuplicates;
                                149                 : 
                                150                 :     /* Open the index. */
 2271 peter_e                   151 GIC       72071 :     idxrel = index_open(idxoid, RowExclusiveLock);
                                152                 : 
   25 akapila                   153 GNC       72071 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
                                154                 : 
 2271 peter_e                   155 GIC       72071 :     InitDirtySnapshot(snap);
                                156                 : 
                                157                 :     /* Build scan key. */
   25 akapila                   158 GNC       72071 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
                                159                 : 
                                160                 :     /* Start an index scan. */
                                161           72071 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
                                162                 : 
 2271 peter_e                   163 UIC           0 : retry:
 2271 peter_e                   164 GIC       72071 :     found = false;
                                165                 : 
   25 akapila                   166 GNC       72071 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
                                167                 : 
                                168                 :     /* Try to find the tuple */
                                169           72071 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 2271 peter_e                   170 ECB             :     {
                                171                 :         /*
                                172                 :          * Avoid expensive equality check if the index is primary key or
                                173                 :          * replica identity index.
                                174                 :          */
   25 akapila                   175 GNC       72063 :         if (!isIdxSafeToSkipDuplicates)
                                176                 :         {
                                177              11 :             if (eq == NULL)
                                178                 :             {
                                179                 : #ifdef USE_ASSERT_CHECKING
                                180                 :                 /* apply assertions only once for the input idxoid */
                                181              11 :                 IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
                                182                 : 
                                183              11 :                 Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
                                184                 : #endif
                                185                 : 
                                186              11 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
                                187                 :             }
                                188                 : 
                                189              11 :             if (!tuples_equal(outslot, searchslot, eq))
   25 akapila                   190 UNC           0 :                 continue;
                                191                 :         }
                                192                 : 
 2271 peter_e                   193 CBC       72063 :         ExecMaterializeSlot(outslot);
                                194                 : 
                                195          144126 :         xwait = TransactionIdIsValid(snap.xmin) ?
 2271 peter_e                   196 GIC       72063 :             snap.xmin : snap.xmax;
                                197                 : 
 2271 peter_e                   198 ECB             :         /*
                                199                 :          * If the tuple is locked, wait for locking transaction to finish and
                                200                 :          * retry.
                                201                 :          */
 2271 peter_e                   202 GIC       72063 :         if (TransactionIdIsValid(xwait))
 2271 peter_e                   203 EUB             :         {
 2271 peter_e                   204 LBC           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 2271 peter_e                   205 UIC           0 :             goto retry;
 2271 peter_e                   206 ECB             :         }
                                207                 : 
                                208                 :         /* Found our tuple and it's not locked */
   25 akapila                   209 GNC       72063 :         found = true;
                                210           72063 :         break;
                                211                 :     }
                                212                 : 
 2271 peter_e                   213 ECB             :     /* Found tuple, try to lock it in the lockmode. */
 2271 peter_e                   214 GIC       72071 :     if (found)
                                215                 :     {
                                216                 :         TM_FailureData tmfd;
                                217                 :         TM_Result   res;
                                218                 : 
 2271 peter_e                   219 CBC       72063 :         PushActiveSnapshot(GetLatestSnapshot());
                                220                 : 
 1417 andres                    221           72063 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
                                222                 :                                outslot,
                                223                 :                                GetCurrentCommandId(false),
                                224                 :                                lockmode,
 1478 andres                    225 ECB             :                                LockWaitBlock,
                                226                 :                                0 /* don't follow updates */ ,
                                227                 :                                &tmfd);
                                228                 : 
 2271 peter_e                   229 GIC       72063 :         PopActiveSnapshot();
 2271 peter_e                   230 ECB             : 
 2271 peter_e                   231 GIC       72063 :         switch (res)
                                232                 :         {
 1478 andres                    233 CBC       72063 :             case TM_Ok:
 2271 peter_e                   234 GBC       72063 :                 break;
 1478 andres                    235 UIC           0 :             case TM_Updated:
                                236                 :                 /* XXX: Improve handling here */
 1478 andres                    237 LBC           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
 1828 andres                    238 UIC           0 :                     ereport(LOG,
 1828 andres                    239 ECB             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                240                 :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
                                241                 :                 else
 1828 andres                    242 UIC           0 :                     ereport(LOG,
                                243                 :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                244                 :                              errmsg("concurrent update, retrying")));
 2271 peter_e                   245               0 :                 goto retry;
 1478 andres                    246 LBC           0 :             case TM_Deleted:
                                247                 :                 /* XXX: Improve handling here */
 1478 andres                    248 UBC           0 :                 ereport(LOG,
 1478 andres                    249 EUB             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                250                 :                          errmsg("concurrent delete, retrying")));
 1478 andres                    251 UIC           0 :                 goto retry;
                                252               0 :             case TM_Invisible:
 2271 peter_e                   253 LBC           0 :                 elog(ERROR, "attempted to lock invisible tuple");
 1804 tgl                       254 ECB             :                 break;
 2271 peter_e                   255 UIC           0 :             default:
 1417 andres                    256               0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
                                257                 :                 break;
 2271 peter_e                   258 ECB             :         }
                                259                 :     }
                                260                 : 
 2271 peter_e                   261 GIC       72071 :     index_endscan(scan);
                                262                 : 
 2271 peter_e                   263 ECB             :     /* Don't release lock until commit. */
 2271 peter_e                   264 GIC       72071 :     index_close(idxrel, NoLock);
 2271 peter_e                   265 ECB             : 
 2271 peter_e                   266 GIC       72071 :     return found;
                                267                 : }
                                268                 : 
                                269                 : /*
                                270                 :  * Compare the tuples in the slots by checking if they have equal values.
                                271                 :  */
                                272                 : static bool
 1093 noah                      273 CBC      105276 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
                                274                 :              TypeCacheEntry **eq)
 2271 peter_e                   275 ECB             : {
                                276                 :     int         attrnum;
                                277                 : 
 1490 andres                    278 CBC      105276 :     Assert(slot1->tts_tupleDescriptor->natts ==
 1490 andres                    279 EUB             :            slot2->tts_tupleDescriptor->natts);
                                280                 : 
 1490 andres                    281 GBC      105276 :     slot_getallattrs(slot1);
                                282          105276 :     slot_getallattrs(slot2);
                                283                 : 
                                284                 :     /* Check equality of the attributes. */
 1490 andres                    285 GIC      105455 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
 2271 peter_e                   286 EUB             :     {
                                287                 :         Form_pg_attribute att;
                                288                 :         TypeCacheEntry *typentry;
 2116                           289                 : 
   19 akapila                   290 GBC      105300 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
                                291                 : 
   19 akapila                   292 EUB             :         /*
                                293                 :          * Ignore dropped and generated columns as the publisher doesn't send
                                294                 :          * those
                                295                 :          */
   17 akapila                   296 GBC      105300 :         if (att->attisdropped || att->attgenerated)
   19                           297               2 :             continue;
                                298                 : 
 2271 peter_e                   299 EUB             :         /*
                                300                 :          * If one value is NULL and other is not, then they are certainly not
                                301                 :          * equal
                                302                 :          */
 1490 andres                    303 GIC      105298 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
 2271 peter_e                   304 UIC           0 :             return false;
 2271 peter_e                   305 ECB             : 
                                306                 :         /*
                                307                 :          * If both are NULL, they can be considered equal.
                                308                 :          */
 1490 andres                    309 GIC      105298 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
 2271 peter_e                   310 CBC           1 :             continue;
                                311                 : 
 1093 noah                      312 GIC      105297 :         typentry = eq[attrnum];
                                313          105297 :         if (typentry == NULL)
                                314                 :         {
                                315             176 :             typentry = lookup_type_cache(att->atttypid,
                                316                 :                                          TYPECACHE_EQ_OPR_FINFO);
 1093 noah                      317 CBC         176 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
 1093 noah                      318 UIC           0 :                 ereport(ERROR,
                                319                 :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
                                320                 :                          errmsg("could not identify an equality operator for type %s",
                                321                 :                                 format_type_be(att->atttypid))));
 1093 noah                      322 CBC         176 :             eq[attrnum] = typentry;
                                323                 :         }
                                324                 : 
 1479 peter                     325          105297 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
 1479 peter                     326 ECB             :                                             att->attcollation,
 1418 tgl                       327 GIC      105297 :                                             slot1->tts_values[attrnum],
                                328          105297 :                                             slot2->tts_values[attrnum])))
 2271 peter_e                   329 CBC      105121 :             return false;
                                330                 :     }
                                331                 : 
 2271 peter_e                   332 GIC         155 :     return true;
                                333                 : }
 2271 peter_e                   334 ECB             : 
                                335                 : /*
                                336                 :  * Search the relation 'rel' for tuple using the sequential scan.
                                337                 :  *
                                338                 :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
                                339                 :  * contents, and return true.  Return false otherwise.
                                340                 :  *
                                341                 :  * Note that this stops on the first matching tuple.
                                342                 :  *
                                343                 :  * This can obviously be quite slow on tables that have more than few rows.
                                344                 :  */
                                345                 : bool
 2271 peter_e                   346 GIC         144 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 2271 peter_e                   347 ECB             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
 2271 peter_e                   348 EUB             : {
                                349                 :     TupleTableSlot *scanslot;
                                350                 :     TableScanDesc scan;
                                351                 :     SnapshotData snap;
                                352                 :     TypeCacheEntry **eq;
 2153 bruce                     353 ECB             :     TransactionId xwait;
                                354                 :     bool        found;
 1490 andres                    355 GIC         144 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
 2271 peter_e                   356 ECB             : 
 2271 peter_e                   357 CBC         144 :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
                                358                 : 
 1093 noah                      359             144 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
                                360                 : 
 1945 peter_e                   361 ECB             :     /* Start a heap scan. */
 2271 peter_e                   362 GBC         144 :     InitDirtySnapshot(snap);
 1490 andres                    363 GIC         144 :     scan = table_beginscan(rel, &snap, 0, NULL);
                                364             144 :     scanslot = table_slot_create(rel, NULL);
                                365                 : 
 2271 peter_e                   366 LBC           0 : retry:
 2271 peter_e                   367 GIC         144 :     found = false;
                                368                 : 
 1490 andres                    369 CBC         144 :     table_rescan(scan, NULL);
                                370                 : 
 2271 peter_e                   371 ECB             :     /* Try to find the tuple */
 1490 andres                    372 CBC      105265 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
 2271 peter_e                   373 ECB             :     {
 1093 noah                      374 GIC      105265 :         if (!tuples_equal(scanslot, searchslot, eq))
 2271 peter_e                   375          105121 :             continue;
 2271 peter_e                   376 ECB             : 
 2271 peter_e                   377 GIC         144 :         found = true;
 1490 andres                    378             144 :         ExecCopySlot(outslot, scanslot);
                                379                 : 
 2271 peter_e                   380             288 :         xwait = TransactionIdIsValid(snap.xmin) ?
                                381             144 :             snap.xmin : snap.xmax;
                                382                 : 
                                383                 :         /*
                                384                 :          * If the tuple is locked, wait for locking transaction to finish and
                                385                 :          * retry.
                                386                 :          */
                                387             144 :         if (TransactionIdIsValid(xwait))
                                388                 :         {
 2271 peter_e                   389 UIC           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 2271 peter_e                   390 LBC           0 :             goto retry;
                                391                 :         }
                                392                 : 
                                393                 :         /* Found our tuple and it's not locked */
 1161 alvherre                  394 GIC         144 :         break;
                                395                 :     }
                                396                 : 
                                397                 :     /* Found tuple, try to lock it in the lockmode. */
 2271 peter_e                   398             144 :     if (found)
 2271 peter_e                   399 ECB             :     {
                                400                 :         TM_FailureData tmfd;
 1478 andres                    401                 :         TM_Result   res;
                                402                 : 
 2271 peter_e                   403 CBC         144 :         PushActiveSnapshot(GetLatestSnapshot());
                                404                 : 
 1417 andres                    405 GIC         144 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
 1478 andres                    406 ECB             :                                outslot,
                                407                 :                                GetCurrentCommandId(false),
                                408                 :                                lockmode,
                                409                 :                                LockWaitBlock,
 1478 andres                    410 EUB             :                                0 /* don't follow updates */ ,
 1478 andres                    411 ECB             :                                &tmfd);
                                412                 : 
 2271 peter_e                   413 CBC         144 :         PopActiveSnapshot();
                                414                 : 
 2271 peter_e                   415 GIC         144 :         switch (res)
 2271 peter_e                   416 ECB             :         {
 1478 andres                    417 GIC         144 :             case TM_Ok:
 2271 peter_e                   418 CBC         144 :                 break;
 1478 andres                    419 LBC           0 :             case TM_Updated:
                                420                 :                 /* XXX: Improve handling here */
                                421               0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
 1828                           422               0 :                     ereport(LOG,
                                423                 :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 1828 andres                    424 ECB             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
                                425                 :                 else
 1828 andres                    426 UIC           0 :                     ereport(LOG,
                                427                 :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                428                 :                              errmsg("concurrent update, retrying")));
 2271 peter_e                   429               0 :                 goto retry;
 1478 andres                    430               0 :             case TM_Deleted:
 1478 andres                    431 ECB             :                 /* XXX: Improve handling here */
 1478 andres                    432 UIC           0 :                 ereport(LOG,
 1478 andres                    433 EUB             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                434                 :                          errmsg("concurrent delete, retrying")));
 1478 andres                    435 UIC           0 :                 goto retry;
                                436               0 :             case TM_Invisible:
 2271 peter_e                   437               0 :                 elog(ERROR, "attempted to lock invisible tuple");
 1804 tgl                       438 ECB             :                 break;
 2271 peter_e                   439 UIC           0 :             default:
 1417 andres                    440               0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
                                441                 :                 break;
 2271 peter_e                   442 ECB             :         }
                                443                 :     }
                                444                 : 
 1490 andres                    445 GIC         144 :     table_endscan(scan);
                                446             144 :     ExecDropSingleTupleTableSlot(scanslot);
 2271 peter_e                   447 ECB             : 
 2271 peter_e                   448 GIC         144 :     return found;
 2271 peter_e                   449 ECB             : }
                                450                 : 
                                451                 : /*
                                452                 :  * Insert tuple represented in the slot to the relation, update the indexes,
                                453                 :  * and execute any constraints and per-row triggers.
                                454                 :  *
                                455                 :  * Caller is responsible for opening the indexes.
                                456                 :  */
                                457                 : void
  907 heikki.linnakangas        458 GIC       75588 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
  907 heikki.linnakangas        459 ECB             :                          EState *estate, TupleTableSlot *slot)
                                460                 : {
 2153 bruce                     461 CBC       75588 :     bool        skip_tuple = false;
                                462           75588 :     Relation    rel = resultRelInfo->ri_RelationDesc;
 2271 peter_e                   463 EUB             : 
                                464                 :     /* For now we support only tables. */
 2271 peter_e                   465 GBC       75588 :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
 2271 peter_e                   466 EUB             : 
 2271 peter_e                   467 GIC       75588 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
                                468                 : 
                                469                 :     /* BEFORE ROW INSERT Triggers */
 2271 peter_e                   470 GBC       75588 :     if (resultRelInfo->ri_TrigDesc &&
 2271 peter_e                   471 GIC          17 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
                                472                 :     {
 1503 andres                    473 GBC           1 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
 1418 tgl                       474               1 :             skip_tuple = true;  /* "do nothing" */
                                475                 :     }
 2271 peter_e                   476 EUB             : 
 2271 peter_e                   477 GIC       75588 :     if (!skip_tuple)
                                478                 :     {
 2271 peter_e                   479 GBC       75587 :         List       *recheckIndexes = NIL;
 2271 peter_e                   480 EUB             : 
 1471 peter                     481                 :         /* Compute stored generated columns */
 1471 peter                     482 GIC       75587 :         if (rel->rd_att->constr &&
 1471 peter                     483 GBC       45388 :             rel->rd_att->constr->has_generated_stored)
  907 heikki.linnakangas        484 UBC           0 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
                                485                 :                                        CMD_INSERT);
                                486                 : 
                                487                 :         /* Check the constraints of the tuple */
 2271 peter_e                   488 GIC       75587 :         if (rel->rd_att->constr)
 1763 alvherre                  489 CBC       45388 :             ExecConstraints(resultRelInfo, slot, estate);
  935 tgl                       490           75587 :         if (rel->rd_rel->relispartition)
 1763 alvherre                  491 GIC          60 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
 2271 peter_e                   492 ECB             : 
                                493                 :         /* OK, store the tuple and create index entries for it */
 1417 andres                    494 GIC       75587 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
                                495                 : 
 2271 peter_e                   496           75587 :         if (resultRelInfo->ri_NumIndices > 0)
  907 heikki.linnakangas        497           55333 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
                                498                 :                                                    slot, estate, false, false,
                                499                 :                                                    NULL, NIL, false);
                                500                 : 
                                501                 :         /* AFTER ROW INSERT Triggers */
 1503 andres                    502 CBC       75578 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
                                503                 :                              recheckIndexes, NULL);
                                504                 : 
 2111 rhodiumtoad               505 ECB             :         /*
                                506                 :          * XXX we should in theory pass a TransitionCaptureState object to the
                                507                 :          * above to capture transition tuples, but after statement triggers
                                508                 :          * don't actually get fired by replication yet anyway
                                509                 :          */
                                510                 : 
 2271 peter_e                   511 CBC       75578 :         list_free(recheckIndexes);
                                512                 :     }
 2271 peter_e                   513 GIC       75579 : }
 2271 peter_e                   514 ECB             : 
                                515                 : /*
                                516                 :  * Find the searchslot tuple and update it with data in the slot,
                                517                 :  * update the indexes, and execute any constraints and per-row triggers.
                                518                 :  *
                                519                 :  * Caller is responsible for opening the indexes.
                                520                 :  */
                                521                 : void
  907 heikki.linnakangas        522 GIC       31907 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
  907 heikki.linnakangas        523 ECB             :                          EState *estate, EPQState *epqstate,
                                524                 :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
                                525                 : {
 2153 bruce                     526 CBC       31907 :     bool        skip_tuple = false;
                                527           31907 :     Relation    rel = resultRelInfo->ri_RelationDesc;
 1478 andres                    528 GBC       31907 :     ItemPointer tid = &(searchslot->tts_tid);
                                529                 : 
                                530                 :     /* For now we support only tables. */
 2271 peter_e                   531 GIC       31907 :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
 2271 peter_e                   532 ECB             : 
 2271 peter_e                   533 CBC       31907 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 2271 peter_e                   534 ECB             : 
 2006 rhaas                     535                 :     /* BEFORE ROW UPDATE Triggers */
 2271 peter_e                   536 GIC       31907 :     if (resultRelInfo->ri_TrigDesc &&
                                537               9 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
 2271 peter_e                   538 ECB             :     {
 1503 andres                    539 GIC           2 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
                                540                 :                                   tid, NULL, slot, NULL, NULL))
 1418 tgl                       541 CBC           2 :             skip_tuple = true;  /* "do nothing" */
                                542                 :     }
                                543                 : 
 2271 peter_e                   544 GIC       31907 :     if (!skip_tuple)
                                545                 :     {
 2271 peter_e                   546 CBC       31905 :         List       *recheckIndexes = NIL;
                                547                 :         TU_UpdateIndexes update_indexes;
                                548                 : 
                                549                 :         /* Compute stored generated columns */
 1471 peter                     550 GIC       31905 :         if (rel->rd_att->constr &&
                                551           31866 :             rel->rd_att->constr->has_generated_stored)
  907 heikki.linnakangas        552               1 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
                                553                 :                                        CMD_UPDATE);
                                554                 : 
 2271 peter_e                   555 ECB             :         /* Check the constraints of the tuple */
 2271 peter_e                   556 GIC       31905 :         if (rel->rd_att->constr)
 1763 alvherre                  557 CBC       31866 :             ExecConstraints(resultRelInfo, slot, estate);
  935 tgl                       558 GIC       31905 :         if (rel->rd_rel->relispartition)
 1763 alvherre                  559              11 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
                                560                 : 
 1417 andres                    561           31905 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
                                562                 :                                   &update_indexes);
                                563                 : 
   20 tomas.vondra              564 GNC       31905 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
  907 heikki.linnakangas        565 GIC       20185 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
  816 pg                        566 ECB             :                                                    slot, estate, true, false,
                                567                 :                                                    NULL, NIL,
                                568                 :                                                    (update_indexes == TU_Summarizing));
                                569                 : 
                                570                 :         /* AFTER ROW UPDATE Triggers */
 2271 peter_e                   571 CBC       31905 :         ExecARUpdateTriggers(estate, resultRelInfo,
  385 alvherre                  572 ECB             :                              NULL, NULL,
 1478 andres                    573                 :                              tid, NULL, slot,
                                574                 :                              recheckIndexes, NULL, false);
                                575                 : 
 2271 peter_e                   576 CBC       31905 :         list_free(recheckIndexes);
                                577                 :     }
                                578           31907 : }
                                579                 : 
                                580                 : /*
 2271 peter_e                   581 ECB             :  * Find the searchslot tuple and delete it, and execute any constraints
                                582                 :  * and per-row triggers.
                                583                 :  *
                                584                 :  * Caller is responsible for opening the indexes.
                                585                 :  */
                                586                 : void
  907 heikki.linnakangas        587 GIC       40299 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
                                588                 :                          EState *estate, EPQState *epqstate,
 2271 peter_e                   589 ECB             :                          TupleTableSlot *searchslot)
                                590                 : {
 2153 bruce                     591 CBC       40299 :     bool        skip_tuple = false;
 2153 bruce                     592 GIC       40299 :     Relation    rel = resultRelInfo->ri_RelationDesc;
 1478 andres                    593           40299 :     ItemPointer tid = &searchslot->tts_tid;
                                594                 : 
 2271 peter_e                   595 CBC       40299 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
 2271 peter_e                   596 ECB             : 
 2006 rhaas                     597                 :     /* BEFORE ROW DELETE Triggers */
 2271 peter_e                   598 GIC       40299 :     if (resultRelInfo->ri_TrigDesc &&
 2005 rhaas                     599              10 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
                                600                 :     {
 2271 peter_e                   601 LBC           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
   27 dean.a.rasheed            602 UNC           0 :                                            tid, NULL, NULL, NULL, NULL);
 2271 peter_e                   603 ECB             :     }
                                604                 : 
 2271 peter_e                   605 GIC       40299 :     if (!skip_tuple)
 2271 peter_e                   606 ECB             :     {
                                607                 :         /* OK, delete the tuple */
 1417 andres                    608 GIC       40299 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
 2271 peter_e                   609 ECB             : 
                                610                 :         /* AFTER ROW DELETE Triggers */
 2271 peter_e                   611 GIC       40299 :         ExecARDeleteTriggers(estate, resultRelInfo,
                                612                 :                              tid, NULL, NULL, false);
                                613                 :     }
                                614           40299 : }
                                615                 : 
 2271 peter_e                   616 ECB             : /*
                                617                 :  * Check if command can be executed with current replica identity.
                                618                 :  */
                                619                 : void
 2271 peter_e                   620 GIC      218870 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
 2271 peter_e                   621 ECB             : {
                                622                 :     PublicationDesc pubdesc;
                                623                 : 
                                624                 :     /*
                                625                 :      * Skip checking the replica identity for partitioned tables, because the
                                626                 :      * operations are actually performed on the leaf partitions.
                                627                 :      */
  236 akapila                   628 GIC      218870 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
                                629          207040 :         return;
                                630                 : 
                                631                 :     /* We only need to do checks for UPDATE and DELETE. */
 2271 peter_e                   632 CBC      216101 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
 2271 peter_e                   633 GIC      129515 :         return;
                                634                 : 
                                635                 :     /*
  411 akapila                   636 ECB             :      * It is only safe to execute UPDATE/DELETE when all columns, referenced
                                637                 :      * in the row filters from publications which the relation is in, are
                                638                 :      * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
                                639                 :      * or the table does not publish UPDATEs or DELETEs.
                                640                 :      *
                                641                 :      * XXX We could optimize it by first checking whether any of the
                                642                 :      * publications have a row filter for this relation. If not and relation
                                643                 :      * has replica identity then we can avoid building the descriptor but as
                                644                 :      * this happens only one time it doesn't seem worth the additional
                                645                 :      * complexity.
  411 akapila                   646 EUB             :      */
  411 akapila                   647 GBC       86586 :     RelationBuildPublicationDesc(rel, &pubdesc);
  411 akapila                   648 GIC       86586 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
                                649              30 :         ereport(ERROR,
  411 akapila                   650 ECB             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                                651                 :                  errmsg("cannot update table \"%s\"",
                                652                 :                         RelationGetRelationName(rel)),
                                653                 :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
  379 tomas.vondra              654 GIC       86556 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
                                655              54 :         ereport(ERROR,
  379 tomas.vondra              656 ECB             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                                657                 :                  errmsg("cannot update table \"%s\"",
                                658                 :                         RelationGetRelationName(rel)),
                                659                 :                  errdetail("Column list used by the publication does not cover the replica identity.")));
  411 akapila                   660 GIC       86502 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
  411 akapila                   661 UIC           0 :         ereport(ERROR,
                                662                 :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                                663                 :                  errmsg("cannot delete from table \"%s\"",
                                664                 :                         RelationGetRelationName(rel)),
  411 akapila                   665 ECB             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
  379 tomas.vondra              666 GIC       86502 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
  379 tomas.vondra              667 UIC           0 :         ereport(ERROR,
                                668                 :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                                669                 :                  errmsg("cannot delete from table \"%s\"",
                                670                 :                         RelationGetRelationName(rel)),
                                671                 :                  errdetail("Column list used by the publication does not cover the replica identity.")));
                                672                 : 
 2271 peter_e                   673 ECB             :     /* If relation has replica identity we are always good. */
  411 akapila                   674 CBC       86502 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
 2271 peter_e                   675 GIC       74556 :         return;
                                676                 : 
  379 tomas.vondra              677 ECB             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
  379 tomas.vondra              678 CBC       11946 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
  379 tomas.vondra              679 GIC         200 :         return;
                                680                 : 
                                681                 :     /*
                                682                 :      * This is UPDATE/DELETE and there is no replica identity.
                                683                 :      *
                                684                 :      * Check if the table publishes UPDATES or DELETES.
                                685                 :      */
  411 akapila                   686           11746 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
 2271 peter_e                   687              45 :         ereport(ERROR,
                                688                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                689                 :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
                                690                 :                         RelationGetRelationName(rel)),
                                691                 :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
  411 akapila                   692 CBC       11701 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
 2271 peter_e                   693 LBC           0 :         ereport(ERROR,
 2271 peter_e                   694 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                695                 :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
                                696                 :                         RelationGetRelationName(rel)),
                                697                 :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
                                698                 : }
 2154                           699                 : 
                                700                 : 
                                701                 : /*
                                702                 :  * Check if we support writing into specific relkind.
                                703                 :  *
                                704                 :  * The nspname and relname are only needed for error reporting.
                                705                 :  */
 2154 peter_e                   706 EUB             : void
 2154 peter_e                   707 GIC         741 : CheckSubscriptionRelkind(char relkind, const char *nspname,
                                708                 :                          const char *relname)
                                709                 : {
  367 tomas.vondra              710             741 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
 2154 peter_e                   711 LBC           0 :         ereport(ERROR,
 2154 peter_e                   712 EUB             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                713                 :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
                                714                 :                         nspname, relname),
                                715                 :                  errdetail_relkind_not_supported(relkind)));
 2154 peter_e                   716 GIC         741 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a