LCOV - differential code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 94.3 % 174 164 1 6 3 2 84 11 67 3 74 2 22
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 11 11 9 2 8 2
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 6 6 6
Legend: Lines: hit not hit (240..) days: 94.0 % 168 158 1 6 3 2 84 5 67 3 74
Function coverage date bins:
(240..) days: 57.9 % 19 11 9 2 8

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * pg_subscription.c
                                  4                 :  *      replication subscriptions
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  * IDENTIFICATION
                                 10                 :  *      src/backend/catalog/pg_subscription.c
                                 11                 :  *
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include "access/genam.h"
                                 18                 : #include "access/heapam.h"
                                 19                 : #include "access/htup_details.h"
                                 20                 : #include "access/tableam.h"
                                 21                 : #include "access/xact.h"
                                 22                 : #include "catalog/indexing.h"
                                 23                 : #include "catalog/pg_subscription.h"
                                 24                 : #include "catalog/pg_subscription_rel.h"
                                 25                 : #include "catalog/pg_type.h"
                                 26                 : #include "miscadmin.h"
                                 27                 : #include "nodes/makefuncs.h"
                                 28                 : #include "storage/lmgr.h"
                                 29                 : #include "utils/array.h"
                                 30                 : #include "utils/builtins.h"
                                 31                 : #include "utils/fmgroids.h"
                                 32                 : #include "utils/lsyscache.h"
                                 33                 : #include "utils/pg_lsn.h"
                                 34                 : #include "utils/rel.h"
                                 35                 : #include "utils/syscache.h"
                                 36                 : 
                                 37                 : static List *textarray_to_stringlist(ArrayType *textarray);
                                 38                 : 
                                 39                 : /*
                                 40                 :  * Fetch the subscription from the syscache.
                                 41                 :  */
                                 42                 : Subscription *
 2271 peter_e                    43 CBC         564 : GetSubscription(Oid subid, bool missing_ok)
                                 44                 : {
                                 45                 :     HeapTuple   tup;
                                 46                 :     Subscription *sub;
                                 47                 :     Form_pg_subscription subform;
                                 48                 :     Datum       datum;
                                 49                 :     bool        isnull;
                                 50                 : 
                                 51             564 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
                                 52                 : 
                                 53             564 :     if (!HeapTupleIsValid(tup))
                                 54                 :     {
 2271 peter_e                    55 UBC           0 :         if (missing_ok)
                                 56               0 :             return NULL;
                                 57                 : 
                                 58               0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
                                 59                 :     }
                                 60                 : 
 2271 peter_e                    61 CBC         564 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
                                 62                 : 
                                 63             564 :     sub = (Subscription *) palloc(sizeof(Subscription));
                                 64             564 :     sub->oid = subid;
                                 65             564 :     sub->dbid = subform->subdbid;
  367 akapila                    66             564 :     sub->skiplsn = subform->subskiplsn;
 2271 peter_e                    67             564 :     sub->name = pstrdup(NameStr(subform->subname));
                                 68             564 :     sub->owner = subform->subowner;
                                 69             564 :     sub->enabled = subform->subenabled;
  995 tgl                        70             564 :     sub->binary = subform->subbinary;
  948 akapila                    71             564 :     sub->stream = subform->substream;
  634                            72             564 :     sub->twophasestate = subform->subtwophasestate;
  391                            73             564 :     sub->disableonerr = subform->subdisableonerr;
   10 rhaas                      74 GNC         564 :     sub->passwordrequired = subform->subpasswordrequired;
    5                            75             564 :     sub->runasowner = subform->subrunasowner;
 2271 peter_e                    76 ECB             : 
                                 77                 :     /* Get conninfo */
   15 dgustafsson                78 GNC         564 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
                                 79                 :                                    tup,
                                 80                 :                                    Anum_pg_subscription_subconninfo);
 2186 peter_e                    81 CBC         564 :     sub->conninfo = TextDatumGetCString(datum);
                                 82                 : 
                                 83                 :     /* Get slotname */
 2271                            84             564 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
                                 85                 :                             tup,
                                 86                 :                             Anum_pg_subscription_subslotname,
                                 87                 :                             &isnull);
 2161                            88             564 :     if (!isnull)
                                 89             531 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
                                 90                 :     else
                                 91              33 :         sub->slotname = NULL;
                                 92                 : 
                                 93                 :     /* Get synccommit */
   15 dgustafsson                94 GNC         564 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
                                 95                 :                                    tup,
                                 96                 :                                    Anum_pg_subscription_subsynccommit);
 2186 peter_e                    97 GIC         564 :     sub->synccommit = TextDatumGetCString(datum);
 2186 peter_e                    98 ECB             : 
                                 99                 :     /* Get publications */
   15 dgustafsson               100 GNC         564 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
                                101                 :                                    tup,
                                102                 :                                    Anum_pg_subscription_subpublications);
 2271 peter_e                   103 GIC         564 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
                                104                 : 
                                105                 :     /* Get origin */
   15 dgustafsson               106 GNC         564 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
                                107                 :                                    tup,
                                108                 :                                    Anum_pg_subscription_suborigin);
  262 akapila                   109             564 :     sub->origin = TextDatumGetCString(datum);
                                110                 : 
 2271 peter_e                   111 CBC         564 :     ReleaseSysCache(tup);
                                112                 : 
                                113             564 :     return sub;
                                114                 : }
 2271 peter_e                   115 ECB             : 
                                116                 : /*
                                117                 :  * Return number of subscriptions defined in given database.
                                118                 :  * Used by dropdb() to check if database can indeed be dropped.
                                119                 :  */
                                120                 : int
 2271 peter_e                   121 GIC          20 : CountDBSubscriptions(Oid dbid)
                                122                 : {
 2153 bruce                     123 CBC          20 :     int         nsubs = 0;
                                124                 :     Relation    rel;
 2153 bruce                     125 ECB             :     ScanKeyData scankey;
                                126                 :     SysScanDesc scan;
                                127                 :     HeapTuple   tup;
                                128                 : 
 1539 andres                    129 GIC          20 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
                                130                 : 
 2271 peter_e                   131 CBC          20 :     ScanKeyInit(&scankey,
                                132                 :                 Anum_pg_subscription_subdbid,
 2271 peter_e                   133 ECB             :                 BTEqualStrategyNumber, F_OIDEQ,
                                134                 :                 ObjectIdGetDatum(dbid));
                                135                 : 
 2271 peter_e                   136 GIC          20 :     scan = systable_beginscan(rel, InvalidOid, false,
                                137                 :                               NULL, 1, &scankey);
 2271 peter_e                   138 ECB             : 
 2271 peter_e                   139 GIC          20 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
 2271 peter_e                   140 UIC           0 :         nsubs++;
 2271 peter_e                   141 ECB             : 
 2271 peter_e                   142 GBC          20 :     systable_endscan(scan);
                                143                 : 
 1539 andres                    144 CBC          20 :     table_close(rel, NoLock);
                                145                 : 
 2271 peter_e                   146              20 :     return nsubs;
                                147                 : }
 2271 peter_e                   148 ECB             : 
                                149                 : /*
                                150                 :  * Free memory allocated by subscription struct.
                                151                 :  */
                                152                 : void
 2271 peter_e                   153 GIC          26 : FreeSubscription(Subscription *sub)
                                154                 : {
 2271 peter_e                   155 CBC          26 :     pfree(sub->name);
 2271 peter_e                   156 GIC          26 :     pfree(sub->conninfo);
 2161 peter_e                   157 CBC          26 :     if (sub->slotname)
                                158              26 :         pfree(sub->slotname);
 2271                           159              26 :     list_free_deep(sub->publications);
                                160              26 :     pfree(sub);
                                161              26 : }
 2271 peter_e                   162 ECB             : 
  391 akapila                   163                 : /*
                                164                 :  * Disable the given subscription.
                                165                 :  */
                                166                 : void
  391 akapila                   167 GIC           4 : DisableSubscription(Oid subid)
                                168                 : {
  391 akapila                   169 ECB             :     Relation    rel;
                                170                 :     bool        nulls[Natts_pg_subscription];
                                171                 :     bool        replaces[Natts_pg_subscription];
                                172                 :     Datum       values[Natts_pg_subscription];
                                173                 :     HeapTuple   tup;
                                174                 : 
                                175                 :     /* Look up the subscription in the catalog */
  391 akapila                   176 GIC           4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
                                177               4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
  391 akapila                   178 ECB             : 
  391 akapila                   179 CBC           4 :     if (!HeapTupleIsValid(tup))
  391 akapila                   180 UIC           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
  391 akapila                   181 ECB             : 
  391 akapila                   182 GBC           4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
                                183                 : 
  391 akapila                   184 ECB             :     /* Form a new tuple. */
  391 akapila                   185 GIC           4 :     memset(values, 0, sizeof(values));
                                186               4 :     memset(nulls, false, sizeof(nulls));
  391 akapila                   187 CBC           4 :     memset(replaces, false, sizeof(replaces));
  391 akapila                   188 ECB             : 
                                189                 :     /* Set the subscription to disabled. */
  391 akapila                   190 GIC           4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
                                191               4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
  391 akapila                   192 ECB             : 
                                193                 :     /* Update the catalog */
  391 akapila                   194 GIC           4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
                                195                 :                             replaces);
  391 akapila                   196 CBC           4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
  391 akapila                   197 GIC           4 :     heap_freetuple(tup);
  391 akapila                   198 ECB             : 
  391 akapila                   199 CBC           4 :     table_close(rel, NoLock);
  391 akapila                   200 GIC           4 : }
  391 akapila                   201 ECB             : 
                                202                 : /*
 2271 peter_e                   203                 :  * Convert text array to list of strings.
                                204                 :  *
                                205                 :  * Note: the resulting list of strings is pallocated here.
                                206                 :  */
                                207                 : static List *
 2271 peter_e                   208 CBC         564 : textarray_to_stringlist(ArrayType *textarray)
 2271 peter_e                   209 EUB             : {
                                210                 :     Datum      *elems;
 2153 bruce                     211 ECB             :     int         nelems,
                                212                 :                 i;
 2153 bruce                     213 CBC         564 :     List       *res = NIL;
                                214                 : 
  282 peter                     215 GNC         564 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
 2271 peter_e                   216 ECB             : 
 2271 peter_e                   217 GIC         564 :     if (nelems == 0)
 2271 peter_e                   218 UIC           0 :         return NIL;
 2271 peter_e                   219 ECB             : 
 2271 peter_e                   220 CBC        1458 :     for (i = 0; i < nelems; i++)
 2186 peter_e                   221 GIC         894 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
                                222                 : 
 2271                           223             564 :     return res;
                                224                 : }
                                225                 : 
 2208 peter_e                   226 ECB             : /*
                                227                 :  * Add new state record for a subscription table.
                                228                 :  */
                                229                 : void
 1829 peter_e                   230 GIC         157 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
                                231                 :                         XLogRecPtr sublsn)
                                232                 : {
                                233                 :     Relation    rel;
                                234                 :     HeapTuple   tup;
 2208 peter_e                   235 ECB             :     bool        nulls[Natts_pg_subscription_rel];
                                236                 :     Datum       values[Natts_pg_subscription_rel];
                                237                 : 
 2106 peter_e                   238 GIC         157 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
                                239                 : 
 1539 andres                    240 CBC         157 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
                                241                 : 
                                242                 :     /* Try finding existing mapping. */
 2208 peter_e                   243             157 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
 2208 peter_e                   244 EUB             :                               ObjectIdGetDatum(relid),
                                245                 :                               ObjectIdGetDatum(subid));
 1829 peter_e                   246 GIC         157 :     if (HeapTupleIsValid(tup))
 1829 peter_e                   247 UIC           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
 1829 peter_e                   248 ECB             :              relid, subid);
                                249                 : 
                                250                 :     /* Form the tuple. */
 1829 peter_e                   251 GIC         157 :     memset(values, 0, sizeof(values));
 1829 peter_e                   252 CBC         157 :     memset(nulls, false, sizeof(nulls));
                                253             157 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
 1829 peter_e                   254 GIC         157 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
 1829 peter_e                   255 CBC         157 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
                                256             157 :     if (sublsn != InvalidXLogRecPtr)
 1829 peter_e                   257 LBC           0 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
                                258                 :     else
 1829 peter_e                   259 CBC         157 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
                                260                 : 
                                261             157 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
                                262                 : 
                                263                 :     /* Insert tuple into catalog. */
 1601 andres                    264 GIC         157 :     CatalogTupleInsert(rel, tup);
 2208 peter_e                   265 ECB             : 
 1829 peter_e                   266 GIC         157 :     heap_freetuple(tup);
                                267                 : 
 1829 peter_e                   268 ECB             :     /* Cleanup. */
 1539 andres                    269 CBC         157 :     table_close(rel, NoLock);
 1829 peter_e                   270 GIC         157 : }
                                271                 : 
                                272                 : /*
                                273                 :  * Update the state of a subscription table.
                                274                 :  */
                                275                 : void
                                276             596 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 1829 peter_e                   277 ECB             :                            XLogRecPtr sublsn)
                                278                 : {
                                279                 :     Relation    rel;
                                280                 :     HeapTuple   tup;
                                281                 :     bool        nulls[Natts_pg_subscription_rel];
                                282                 :     Datum       values[Natts_pg_subscription_rel];
                                283                 :     bool        replaces[Natts_pg_subscription_rel];
                                284                 : 
 1829 peter_e                   285 GIC         596 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
                                286                 : 
 1539 andres                    287             596 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
                                288                 : 
 1829 peter_e                   289 ECB             :     /* Try finding existing mapping. */
 1829 peter_e                   290 GIC         596 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
                                291                 :                               ObjectIdGetDatum(relid),
 1829 peter_e                   292 ECB             :                               ObjectIdGetDatum(subid));
 1829 peter_e                   293 GIC         596 :     if (!HeapTupleIsValid(tup))
 1829 peter_e                   294 UIC           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
                                295                 :              relid, subid);
 1829 peter_e                   296 ECB             : 
                                297                 :     /* Update the tuple. */
 1829 peter_e                   298 CBC         596 :     memset(values, 0, sizeof(values));
                                299             596 :     memset(nulls, false, sizeof(nulls));
                                300             596 :     memset(replaces, false, sizeof(replaces));
                                301                 : 
 1829 peter_e                   302 GIC         596 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
                                303             596 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
 1829 peter_e                   304 ECB             : 
 1829 peter_e                   305 GIC         596 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
                                306             596 :     if (sublsn != InvalidXLogRecPtr)
 1829 peter_e                   307 CBC         293 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
                                308                 :     else
                                309             303 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 1829 peter_e                   310 ECB             : 
 1829 peter_e                   311 GIC         596 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 1829 peter_e                   312 ECB             :                             replaces);
                                313                 : 
                                314                 :     /* Update the catalog. */
 1829 peter_e                   315 CBC         596 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                                316                 : 
 2208 peter_e                   317 ECB             :     /* Cleanup. */
 1539 andres                    318 GIC         596 :     table_close(rel, NoLock);
 2208 peter_e                   319 CBC         596 : }
                                320                 : 
                                321                 : /*
                                322                 :  * Get state of subscription table.
                                323                 :  *
                                324                 :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
                                325                 :  */
                                326                 : char
  906 alvherre                  327             948 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
                                328                 : {
                                329                 :     HeapTuple   tup;
                                330                 :     char        substate;
                                331                 :     bool        isnull;
                                332                 :     Datum       d;
  786 akapila                   333 ECB             :     Relation    rel;
                                334                 : 
                                335                 :     /*
                                336                 :      * This is to avoid the race condition with AlterSubscription which tries
                                337                 :      * to remove this relstate.
                                338                 :      */
  786 akapila                   339 CBC         948 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
                                340                 : 
                                341                 :     /* Try finding the mapping. */
 2208 peter_e                   342 GIC         948 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
                                343                 :                           ObjectIdGetDatum(relid),
                                344                 :                           ObjectIdGetDatum(subid));
                                345                 : 
 2208 peter_e                   346 CBC         948 :     if (!HeapTupleIsValid(tup))
                                347                 :     {
  773 akapila                   348              24 :         table_close(rel, AccessShareLock);
  906 alvherre                  349 GIC          24 :         *sublsn = InvalidXLogRecPtr;
                                350              24 :         return SUBREL_STATE_UNKNOWN;
                                351                 :     }
                                352                 : 
                                353                 :     /* Get the state. */
                                354             924 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
                                355                 : 
  906 alvherre                  356 ECB             :     /* Get the LSN */
 2208 peter_e                   357 CBC         924 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
                                358                 :                         Anum_pg_subscription_rel_srsublsn, &isnull);
 2208 peter_e                   359 GIC         924 :     if (isnull)
                                360             480 :         *sublsn = InvalidXLogRecPtr;
 2208 peter_e                   361 ECB             :     else
 2208 peter_e                   362 GIC         444 :         *sublsn = DatumGetLSN(d);
                                363                 : 
                                364                 :     /* Cleanup */
                                365             924 :     ReleaseSysCache(tup);
                                366                 : 
  786 akapila                   367             924 :     table_close(rel, AccessShareLock);
                                368                 : 
 2208 peter_e                   369             924 :     return substate;
 2208 peter_e                   370 ECB             : }
                                371                 : 
 2208 peter_e                   372 EUB             : /*
                                373                 :  * Drop subscription relation mapping. These can be for a particular
                                374                 :  * subscription, or for a particular relation, or both.
                                375                 :  */
                                376                 : void
 2208 peter_e                   377 GIC       19265 : RemoveSubscriptionRel(Oid subid, Oid relid)
                                378                 : {
                                379                 :     Relation    rel;
                                380                 :     TableScanDesc scan;
                                381                 :     ScanKeyData skey[2];
                                382                 :     HeapTuple   tup;
                                383           19265 :     int         nkeys = 0;
                                384                 : 
 1539 andres                    385           19265 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
                                386                 : 
 2208 peter_e                   387           19265 :     if (OidIsValid(subid))
 2208 peter_e                   388 ECB             :     {
 2208 peter_e                   389 GIC          99 :         ScanKeyInit(&skey[nkeys++],
 2208 peter_e                   390 ECB             :                     Anum_pg_subscription_rel_srsubid,
                                391                 :                     BTEqualStrategyNumber,
                                392                 :                     F_OIDEQ,
                                393                 :                     ObjectIdGetDatum(subid));
                                394                 :     }
                                395                 : 
 2208 peter_e                   396 GIC       19265 :     if (OidIsValid(relid))
                                397                 :     {
                                398           19196 :         ScanKeyInit(&skey[nkeys++],
                                399                 :                     Anum_pg_subscription_rel_srrelid,
                                400                 :                     BTEqualStrategyNumber,
                                401                 :                     F_OIDEQ,
 2208 peter_e                   402 ECB             :                     ObjectIdGetDatum(relid));
                                403                 :     }
                                404                 : 
                                405                 :     /* Do the search and delete what we found. */
 1490 andres                    406 GIC       19265 :     scan = table_beginscan_catalog(rel, nkeys, skey);
 2208 peter_e                   407           19345 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
                                408                 :     {
  786 akapila                   409 ECB             :         Form_pg_subscription_rel subrel;
                                410                 : 
  786 akapila                   411 CBC          80 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
                                412                 : 
                                413                 :         /*
                                414                 :          * We don't allow to drop the relation mapping when the table
                                415                 :          * synchronization is in progress unless the caller updates the
  786 akapila                   416 ECB             :          * corresponding subscription as well. This is to ensure that we don't
                                417                 :          * leave tablesync slots or origins in the system when the
                                418                 :          * corresponding table is dropped.
                                419                 :          */
  786 akapila                   420 CBC          80 :         if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
                                421                 :         {
  786 akapila                   422 UIC           0 :             ereport(ERROR,
  786 akapila                   423 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                424                 :                      errmsg("could not drop relation mapping for subscription \"%s\"",
                                425                 :                             get_subscription_name(subrel->srsubid, false)),
                                426                 :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
                                427                 :                                get_rel_name(relid), subrel->srsubstate),
                                428                 : 
                                429                 :             /*
                                430                 :              * translator: first %s is a SQL ALTER command and second %s is a
                                431                 :              * SQL DROP command
                                432                 :              */
                                433                 :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
                                434                 :                              "ALTER SUBSCRIPTION ... ENABLE",
                                435                 :                              "DROP SUBSCRIPTION ...")));
                                436                 :         }
                                437                 : 
 2125 tgl                       438 GIC          80 :         CatalogTupleDelete(rel, &tup->t_self);
 2208 peter_e                   439 ECB             :     }
 1490 andres                    440 GIC       19265 :     table_endscan(scan);
                                441                 : 
 1539 andres                    442 CBC       19265 :     table_close(rel, RowExclusiveLock);
 2208 peter_e                   443 GIC       19265 : }
                                444                 : 
                                445                 : /*
  634 akapila                   446 ECB             :  * Does the subscription have any relations?
                                447                 :  *
                                448                 :  * Use this function only to know true/false, and when you have no need for the
                                449                 :  * List returned by GetSubscriptionRelations.
                                450                 :  */
                                451                 : bool
  634 akapila                   452 GIC         167 : HasSubscriptionRelations(Oid subid)
  634 akapila                   453 ECB             : {
                                454                 :     Relation    rel;
                                455                 :     ScanKeyData skey[1];
                                456                 :     SysScanDesc scan;
                                457                 :     bool        has_subrels;
                                458                 : 
  634 akapila                   459 CBC         167 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
                                460                 : 
  634 akapila                   461 GIC         167 :     ScanKeyInit(&skey[0],
  634 akapila                   462 ECB             :                 Anum_pg_subscription_rel_srsubid,
                                463                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                464                 :                 ObjectIdGetDatum(subid));
                                465                 : 
  634 akapila                   466 GIC         167 :     scan = systable_beginscan(rel, InvalidOid, false,
                                467                 :                               NULL, 1, skey);
                                468                 : 
  634 akapila                   469 ECB             :     /* If even a single tuple exists then the subscription has tables. */
  634 akapila                   470 GIC         167 :     has_subrels = HeapTupleIsValid(systable_getnext(scan));
  634 akapila                   471 ECB             : 
                                472                 :     /* Cleanup */
  634 akapila                   473 CBC         167 :     systable_endscan(scan);
                                474             167 :     table_close(rel, AccessShareLock);
                                475                 : 
                                476             167 :     return has_subrels;
  634 akapila                   477 ECB             : }
                                478                 : 
 2208 peter_e                   479                 : /*
                                480                 :  * Get the relations for the subscription.
                                481                 :  *
                                482                 :  * If not_ready is true, return only the relations that are not in a ready
                                483                 :  * state, otherwise return all the relations of the subscription.  The
                                484                 :  * returned list is palloc'ed in the current memory context.
                                485                 :  */
                                486                 : List *
  256 michael                   487 GNC         771 : GetSubscriptionRelations(Oid subid, bool not_ready)
                                488                 : {
 2208 peter_e                   489 GIC         771 :     List       *res = NIL;
                                490                 :     Relation    rel;
                                491                 :     HeapTuple   tup;
                                492             771 :     int         nkeys = 0;
                                493                 :     ScanKeyData skey[2];
                                494                 :     SysScanDesc scan;
                                495                 : 
 1539 andres                    496             771 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
                                497                 : 
 2208 peter_e                   498             771 :     ScanKeyInit(&skey[nkeys++],
                                499                 :                 Anum_pg_subscription_rel_srsubid,
                                500                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                501                 :                 ObjectIdGetDatum(subid));
                                502                 : 
  256 michael                   503 GNC         771 :     if (not_ready)
                                504             733 :         ScanKeyInit(&skey[nkeys++],
                                505                 :                     Anum_pg_subscription_rel_srsubstate,
                                506                 :                     BTEqualStrategyNumber, F_CHARNE,
                                507                 :                     CharGetDatum(SUBREL_STATE_READY));
                                508                 : 
 2208 peter_e                   509 GIC         771 :     scan = systable_beginscan(rel, InvalidOid, false,
                                510                 :                               NULL, nkeys, skey);
                                511                 : 
                                512            2056 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
                                513                 :     {
                                514                 :         Form_pg_subscription_rel subrel;
                                515                 :         SubscriptionRelState *relstate;
                                516                 :         Datum       d;
                                517                 :         bool        isnull;
                                518                 : 
                                519            1285 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
                                520                 : 
 2153 bruce                     521            1285 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
 2208 peter_e                   522            1285 :         relstate->relid = subrel->srrelid;
                                523            1285 :         relstate->state = subrel->srsubstate;
  993 tgl                       524            1285 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
                                525                 :                             Anum_pg_subscription_rel_srsublsn, &isnull);
                                526            1285 :         if (isnull)
                                527            1044 :             relstate->lsn = InvalidXLogRecPtr;
                                528                 :         else
                                529             241 :             relstate->lsn = DatumGetLSN(d);
                                530                 : 
 2208 peter_e                   531            1285 :         res = lappend(res, relstate);
                                532                 :     }
                                533                 : 
                                534                 :     /* Cleanup */
                                535             771 :     systable_endscan(scan);
 1539 andres                    536             771 :     table_close(rel, AccessShareLock);
                                537                 : 
 2208 peter_e                   538             771 :     return res;
                                539                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a