LCOV - differential code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.6 % 651 603 26 12 10 2 392 10 199 36 378
Current Date: 2023-04-08 17:13:01 Functions: 96.7 % 30 29 1 29 1 29
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (120,180] days: 100.0 % 6 6 6
Legend: Lines: hit not hit (180,240] days: 96.8 % 31 30 1 16 4 10 1 13
(240..) days: 92.3 % 614 567 25 12 10 2 376 189 34 365
Function coverage date bins:
(180,240] days: 33.3 % 3 1 1 2
(240..) days: 49.1 % 57 28 1 28 1 27

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * publicationcmds.c
                                  4                 :  *      publication manipulation
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  * IDENTIFICATION
                                 10                 :  *      src/backend/commands/publicationcmds.c
                                 11                 :  *
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include "access/genam.h"
                                 18                 : #include "access/htup_details.h"
                                 19                 : #include "access/table.h"
                                 20                 : #include "access/xact.h"
                                 21                 : #include "catalog/catalog.h"
                                 22                 : #include "catalog/indexing.h"
                                 23                 : #include "catalog/namespace.h"
                                 24                 : #include "catalog/objectaccess.h"
                                 25                 : #include "catalog/objectaddress.h"
                                 26                 : #include "catalog/partition.h"
                                 27                 : #include "catalog/pg_database.h"
                                 28                 : #include "catalog/pg_inherits.h"
                                 29                 : #include "catalog/pg_namespace.h"
                                 30                 : #include "catalog/pg_proc.h"
                                 31                 : #include "catalog/pg_publication.h"
                                 32                 : #include "catalog/pg_publication_namespace.h"
                                 33                 : #include "catalog/pg_publication_rel.h"
                                 34                 : #include "catalog/pg_type.h"
                                 35                 : #include "commands/dbcommands.h"
                                 36                 : #include "commands/defrem.h"
                                 37                 : #include "commands/event_trigger.h"
                                 38                 : #include "commands/publicationcmds.h"
                                 39                 : #include "funcapi.h"
                                 40                 : #include "miscadmin.h"
                                 41                 : #include "nodes/nodeFuncs.h"
                                 42                 : #include "parser/parse_clause.h"
                                 43                 : #include "parser/parse_collate.h"
                                 44                 : #include "parser/parse_relation.h"
                                 45                 : #include "storage/lmgr.h"
                                 46                 : #include "utils/acl.h"
                                 47                 : #include "utils/array.h"
                                 48                 : #include "utils/builtins.h"
                                 49                 : #include "utils/catcache.h"
                                 50                 : #include "utils/fmgroids.h"
                                 51                 : #include "utils/inval.h"
                                 52                 : #include "utils/lsyscache.h"
                                 53                 : #include "utils/rel.h"
                                 54                 : #include "utils/syscache.h"
                                 55                 : #include "utils/varlena.h"
                                 56                 : 
                                 57                 : 
                                 58                 : /*
                                 59                 :  * Information used to validate the columns in the row filter expression. See
                                 60                 :  * contain_invalid_rfcolumn_walker for details.
                                 61                 :  */
                                 62                 : typedef struct rf_context
                                 63                 : {
                                 64                 :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
                                 65                 :     bool        pubviaroot;     /* true if we are validating the parent
                                 66                 :                                  * relation's row filter */
                                 67                 :     Oid         relid;          /* relid of the relation */
                                 68                 :     Oid         parentid;       /* relid of the parent relation */
                                 69                 : } rf_context;
                                 70                 : 
                                 71                 : static List *OpenTableList(List *tables);
                                 72                 : static void CloseTableList(List *rels);
                                 73                 : static void LockSchemaList(List *schemalist);
                                 74                 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
                                 75                 :                                  AlterPublicationStmt *stmt);
                                 76                 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
                                 77                 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
                                 78                 :                                   AlterPublicationStmt *stmt);
                                 79                 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
                                 80                 : 
                                 81                 : 
                                 82                 : static void
  633 dean.a.rasheed             83 GIC         374 : parse_publication_options(ParseState *pstate,
  633 dean.a.rasheed             84 ECB             :                           List *options,
                                 85                 :                           bool *publish_given,
                                 86                 :                           PublicationActions *pubactions,
                                 87                 :                           bool *publish_via_partition_root_given,
                                 88                 :                           bool *publish_via_partition_root)
                                 89                 : {
                                 90                 :     ListCell   *lc;
                                 91                 : 
 2158 peter_e                    92 GIC         374 :     *publish_given = false;
 1096 peter                      93 CBC         374 :     *publish_via_partition_root_given = false;
 2271 peter_e                    94 ECB             : 
                                 95                 :     /* defaults */
 1096 peter                      96 GIC         374 :     pubactions->pubinsert = true;
 1096 peter                      97 CBC         374 :     pubactions->pubupdate = true;
                                 98             374 :     pubactions->pubdelete = true;
                                 99             374 :     pubactions->pubtruncate = true;
                                100             374 :     *publish_via_partition_root = false;
 2271 peter_e                   101 ECB             : 
                                102                 :     /* Parse options */
 2153 bruce                     103 GIC         509 :     foreach(lc, options)
 2271 peter_e                   104 ECB             :     {
 2271 peter_e                   105 GIC         144 :         DefElem    *defel = (DefElem *) lfirst(lc);
 2271 peter_e                   106 ECB             : 
 2158 peter_e                   107 GIC         144 :         if (strcmp(defel->defname, "publish") == 0)
 2271 peter_e                   108 ECB             :         {
                                109                 :             char       *publish;
                                110                 :             List       *publish_list;
                                111                 :             ListCell   *lc2;
                                112                 : 
 2158 peter_e                   113 GIC          60 :             if (*publish_given)
  633 dean.a.rasheed            114 LBC           0 :                 errorConflictingDefElem(defel, pstate);
 2271 peter_e                   115 EUB             : 
                                116                 :             /*
                                117                 :              * If publish option was given only the explicitly listed actions
                                118                 :              * should be published.
                                119                 :              */
 1096 peter                     120 GIC          60 :             pubactions->pubinsert = false;
 1096 peter                     121 CBC          60 :             pubactions->pubupdate = false;
                                122              60 :             pubactions->pubdelete = false;
                                123              60 :             pubactions->pubtruncate = false;
 2271 peter_e                   124 ECB             : 
 2158 peter_e                   125 GIC          60 :             *publish_given = true;
 2158 peter_e                   126 CBC          60 :             publish = defGetString(defel);
 2271 peter_e                   127 ECB             : 
 2158 peter_e                   128 GIC          60 :             if (!SplitIdentifierString(publish, ',', &publish_list))
 2271 peter_e                   129 LBC           0 :                 ereport(ERROR,
 2271 peter_e                   130 EUB             :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                131                 :                          errmsg("invalid list syntax in parameter \"%s\"",
                                132                 :                                 "publish")));
                                133                 : 
                                134                 :             /* Process the option list. */
  186 drowley                   135 GNC         130 :             foreach(lc2, publish_list)
 2158 peter_e                   136 ECB             :             {
  186 drowley                   137 GNC          73 :                 char       *publish_opt = (char *) lfirst(lc2);
 2158 peter_e                   138 ECB             : 
 2158 peter_e                   139 GIC          73 :                 if (strcmp(publish_opt, "insert") == 0)
 1096 peter                     140 CBC          53 :                     pubactions->pubinsert = true;
 2158 peter_e                   141              20 :                 else if (strcmp(publish_opt, "update") == 0)
 1096 peter                     142               9 :                     pubactions->pubupdate = true;
 2158 peter_e                   143              11 :                 else if (strcmp(publish_opt, "delete") == 0)
 1096 peter                     144               4 :                     pubactions->pubdelete = true;
 1828 peter_e                   145               7 :                 else if (strcmp(publish_opt, "truncate") == 0)
 1096 peter                     146               4 :                     pubactions->pubtruncate = true;
 2158 peter_e                   147 ECB             :                 else
 2158 peter_e                   148 GIC           3 :                     ereport(ERROR,
 2158 peter_e                   149 ECB             :                             (errcode(ERRCODE_SYNTAX_ERROR),
                                150                 :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
                                151                 :                                     "publish", publish_opt)));
                                152                 :             }
                                153                 :         }
 1096 peter                     154 GIC          84 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
 1096 peter                     155 ECB             :         {
 1096 peter                     156 GIC          81 :             if (*publish_via_partition_root_given)
  633 dean.a.rasheed            157 CBC           3 :                 errorConflictingDefElem(defel, pstate);
 1096 peter                     158              78 :             *publish_via_partition_root_given = true;
                                159              78 :             *publish_via_partition_root = defGetBoolean(defel);
 1096 peter                     160 ECB             :         }
                                161                 :         else
 2158 peter_e                   162 GIC           3 :             ereport(ERROR,
 2158 peter_e                   163 ECB             :                     (errcode(ERRCODE_SYNTAX_ERROR),
                                164                 :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
                                165                 :     }
 2271 peter_e                   166 GIC         365 : }
 2271 peter_e                   167 ECB             : 
                                168                 : /*
                                169                 :  * Convert the PublicationObjSpecType list into schema oid list and
                                170                 :  * PublicationTable list.
                                171                 :  */
                                172                 : static void
  529 akapila                   173 GIC         737 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
  367 tomas.vondra              174 ECB             :                            List **rels, List **schemas)
                                175                 : {
                                176                 :     ListCell   *cell;
                                177                 :     PublicationObjSpec *pubobj;
                                178                 : 
  529 akapila                   179 GIC         737 :     if (!pubobjspec_list)
  529 akapila                   180 CBC          39 :         return;
  529 akapila                   181 ECB             : 
  529 akapila                   182 GIC        1477 :     foreach(cell, pubobjspec_list)
  529 akapila                   183 ECB             :     {
                                184                 :         Oid         schemaid;
                                185                 :         List       *search_path;
                                186                 : 
  529 akapila                   187 GIC         797 :         pubobj = (PublicationObjSpec *) lfirst(cell);
  529 akapila                   188 ECB             : 
  529 akapila                   189 GIC         797 :         switch (pubobj->pubobjtype)
  529 akapila                   190 ECB             :         {
  529 akapila                   191 GIC         619 :             case PUBLICATIONOBJ_TABLE:
  367 tomas.vondra              192 CBC         619 :                 *rels = lappend(*rels, pubobj->pubtable);
  529 akapila                   193             619 :                 break;
  465 alvherre                  194             166 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
  529 akapila                   195             166 :                 schemaid = get_namespace_oid(pubobj->name, false);
  529 akapila                   196 ECB             : 
                                197                 :                 /* Filter out duplicates if user specifies "sch1, sch1" */
  367 tomas.vondra              198 GIC         151 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
  529 akapila                   199 CBC         151 :                 break;
  465 alvherre                  200              12 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
  529 akapila                   201              12 :                 search_path = fetch_search_path(false);
                                202              12 :                 if (search_path == NIL) /* nothing valid in search_path? */
                                203               3 :                     ereport(ERROR,
  529 akapila                   204 ECB             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
                                205                 :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
                                206                 : 
  529 akapila                   207 GIC           9 :                 schemaid = linitial_oid(search_path);
  529 akapila                   208 CBC           9 :                 list_free(search_path);
  529 akapila                   209 ECB             : 
                                210                 :                 /* Filter out duplicates if user specifies "sch1, sch1" */
  367 tomas.vondra              211 GIC           9 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
  529 akapila                   212 CBC           9 :                 break;
  529 akapila                   213 LBC           0 :             default:
  529 akapila                   214 EUB             :                 /* shouldn't happen */
  529 akapila                   215 UIC           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
  529 akapila                   216 EUB             :                 break;
                                217                 :         }
                                218                 :     }
                                219                 : }
                                220                 : 
                                221                 : /*
                                222                 :  * Returns true if any of the columns used in the row filter WHERE expression is
                                223                 :  * not part of REPLICA IDENTITY, false otherwise.
                                224                 :  */
                                225                 : static bool
  411 akapila                   226 GIC         129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
  411 akapila                   227 ECB             : {
  411 akapila                   228 GIC         129 :     if (node == NULL)
  411 akapila                   229 LBC           0 :         return false;
  411 akapila                   230 EUB             : 
  411 akapila                   231 GIC         129 :     if (IsA(node, Var))
  411 akapila                   232 ECB             :     {
  411 akapila                   233 GIC          52 :         Var        *var = (Var *) node;
  411 akapila                   234 CBC          52 :         AttrNumber  attnum = var->varattno;
  411 akapila                   235 ECB             : 
                                236                 :         /*
                                237                 :          * If pubviaroot is true, we are validating the row filter of the
                                238                 :          * parent table, but the bitmap contains the replica identity
                                239                 :          * information of the child table. So, get the column number of the
                                240                 :          * child table as parent and child column order could be different.
                                241                 :          */
  411 akapila                   242 GIC          52 :         if (context->pubviaroot)
  411 akapila                   243 ECB             :         {
  411 akapila                   244 GIC           8 :             char       *colname = get_attname(context->parentid, attnum, false);
  411 akapila                   245 ECB             : 
  411 akapila                   246 GIC           8 :             attnum = get_attnum(context->relid, colname);
  411 akapila                   247 ECB             :         }
                                248                 : 
  411 akapila                   249 GIC          52 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
  411 akapila                   250 CBC          52 :                            context->bms_replident))
                                251              30 :             return true;
  411 akapila                   252 ECB             :     }
                                253                 : 
  411 akapila                   254 GIC          99 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
  411 akapila                   255 ECB             :                                   (void *) context);
                                256                 : }
                                257                 : 
                                258                 : /*
                                259                 :  * Check if all columns referenced in the filter expression are part of the
                                260                 :  * REPLICA IDENTITY index or not.
                                261                 :  *
                                262                 :  * Returns true if any invalid column is found.
                                263                 :  */
                                264                 : bool
  379 tomas.vondra              265 GIC         321 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
  332 tgl                       266 ECB             :                                bool pubviaroot)
                                267                 : {
                                268                 :     HeapTuple   rftuple;
  411 akapila                   269 GIC         321 :     Oid         relid = RelationGetRelid(relation);
  411 akapila                   270 CBC         321 :     Oid         publish_as_relid = RelationGetRelid(relation);
                                271             321 :     bool        result = false;
  411 akapila                   272 ECB             :     Datum       rfdatum;
                                273                 :     bool        rfisnull;
                                274                 : 
                                275                 :     /*
                                276                 :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
                                277                 :      * allowed in the row filter and we can skip the validation.
                                278                 :      */
  411 akapila                   279 GIC         321 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
  411 akapila                   280 CBC          72 :         return false;
  411 akapila                   281 ECB             : 
                                282                 :     /*
                                283                 :      * For a partition, if pubviaroot is true, find the topmost ancestor that
                                284                 :      * is published via this publication as we need to use its row filter
                                285                 :      * expression to filter the partition's changes.
                                286                 :      *
                                287                 :      * Note that even though the row filter used is for an ancestor, the
                                288                 :      * REPLICA IDENTITY used will be for the actual child table.
                                289                 :      */
  411 akapila                   290 GIC         249 :     if (pubviaroot && relation->rd_rel->relispartition)
  411 akapila                   291 ECB             :     {
                                292                 :         publish_as_relid
  389 tomas.vondra              293 GIC          53 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
  411 akapila                   294 ECB             : 
  411 akapila                   295 GIC          53 :         if (!OidIsValid(publish_as_relid))
  411 akapila                   296 CBC           3 :             publish_as_relid = relid;
  411 akapila                   297 ECB             :     }
                                298                 : 
  411 akapila                   299 GIC         249 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
  411 akapila                   300 ECB             :                               ObjectIdGetDatum(publish_as_relid),
                                301                 :                               ObjectIdGetDatum(pubid));
                                302                 : 
  411 akapila                   303 GIC         249 :     if (!HeapTupleIsValid(rftuple))
  411 akapila                   304 CBC          28 :         return false;
  411 akapila                   305 ECB             : 
  411 akapila                   306 GIC         221 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
  411 akapila                   307 ECB             :                               Anum_pg_publication_rel_prqual,
                                308                 :                               &rfisnull);
                                309                 : 
  411 akapila                   310 GIC         221 :     if (!rfisnull)
  411 akapila                   311 ECB             :     {
  411 akapila                   312 GIC          51 :         rf_context  context = {0};
  411 akapila                   313 ECB             :         Node       *rfnode;
  411 akapila                   314 GIC          51 :         Bitmapset  *bms = NULL;
  411 akapila                   315 ECB             : 
  411 akapila                   316 GIC          51 :         context.pubviaroot = pubviaroot;
  411 akapila                   317 CBC          51 :         context.parentid = publish_as_relid;
                                318              51 :         context.relid = relid;
  411 akapila                   319 ECB             : 
                                320                 :         /* Remember columns that are part of the REPLICA IDENTITY */
  411 akapila                   321 GIC          51 :         bms = RelationGetIndexAttrBitmap(relation,
  411 akapila                   322 ECB             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
                                323                 : 
  411 akapila                   324 GIC          51 :         context.bms_replident = bms;
  411 akapila                   325 CBC          51 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
                                326              51 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
  411 akapila                   327 ECB             :     }
                                328                 : 
  411 akapila                   329 GIC         221 :     ReleaseSysCache(rftuple);
  411 akapila                   330 ECB             : 
  411 akapila                   331 GIC         221 :     return result;
  411 akapila                   332 ECB             : }
                                333                 : 
                                334                 : /*
                                335                 :  * Check if all columns referenced in the REPLICA IDENTITY are covered by
                                336                 :  * the column list.
                                337                 :  *
                                338                 :  * Returns true if any replica identity column is not covered by column list.
                                339                 :  */
                                340                 : bool
  379 tomas.vondra              341 GIC         321 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
  332 tgl                       342 ECB             :                                     bool pubviaroot)
                                343                 : {
                                344                 :     HeapTuple   tuple;
  379 tomas.vondra              345 GIC         321 :     Oid         relid = RelationGetRelid(relation);
  379 tomas.vondra              346 CBC         321 :     Oid         publish_as_relid = RelationGetRelid(relation);
                                347             321 :     bool        result = false;
  379 tomas.vondra              348 ECB             :     Datum       datum;
                                349                 :     bool        isnull;
                                350                 : 
                                351                 :     /*
                                352                 :      * For a partition, if pubviaroot is true, find the topmost ancestor that
                                353                 :      * is published via this publication as we need to use its column list for
                                354                 :      * the changes.
                                355                 :      *
                                356                 :      * Note that even though the column list used is for an ancestor, the
                                357                 :      * REPLICA IDENTITY used will be for the actual child table.
                                358                 :      */
  379 tomas.vondra              359 GIC         321 :     if (pubviaroot && relation->rd_rel->relispartition)
  379 tomas.vondra              360 ECB             :     {
  379 tomas.vondra              361 GIC          64 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
  379 tomas.vondra              362 ECB             : 
  379 tomas.vondra              363 GIC          64 :         if (!OidIsValid(publish_as_relid))
  379 tomas.vondra              364 CBC           3 :             publish_as_relid = relid;
  379 tomas.vondra              365 ECB             :     }
                                366                 : 
  379 tomas.vondra              367 GIC         321 :     tuple = SearchSysCache2(PUBLICATIONRELMAP,
  332 tgl                       368 ECB             :                             ObjectIdGetDatum(publish_as_relid),
                                369                 :                             ObjectIdGetDatum(pubid));
                                370                 : 
  379 tomas.vondra              371 GIC         321 :     if (!HeapTupleIsValid(tuple))
  379 tomas.vondra              372 CBC          32 :         return false;
  379 tomas.vondra              373 ECB             : 
  379 tomas.vondra              374 GIC         289 :     datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
  332 tgl                       375 ECB             :                             Anum_pg_publication_rel_prattrs,
                                376                 :                             &isnull);
                                377                 : 
  379 tomas.vondra              378 GIC         289 :     if (!isnull)
  379 tomas.vondra              379 ECB             :     {
                                380                 :         int         x;
                                381                 :         Bitmapset  *idattrs;
  379 tomas.vondra              382 GIC         115 :         Bitmapset  *columns = NULL;
  379 tomas.vondra              383 ECB             : 
                                384                 :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
  379 tomas.vondra              385 GIC         115 :         if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
  379 tomas.vondra              386 CBC          18 :             result = true;
  379 tomas.vondra              387 ECB             : 
                                388                 :         /* Transform the column list datum to a bitmapset. */
  379 tomas.vondra              389 GIC         115 :         columns = pub_collist_to_bitmapset(NULL, datum, NULL);
  379 tomas.vondra              390 ECB             : 
                                391                 :         /* Remember columns that are part of the REPLICA IDENTITY */
  379 tomas.vondra              392 GIC         115 :         idattrs = RelationGetIndexAttrBitmap(relation,
  379 tomas.vondra              393 ECB             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
                                394                 : 
                                395                 :         /*
                                396                 :          * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
                                397                 :          * offset (to handle system columns the usual way), while column list
                                398                 :          * does not use offset, so we can't do bms_is_subset(). Instead, we
                                399                 :          * have to loop over the idattrs and check all of them are in the
                                400                 :          * list.
                                401                 :          */
  379 tomas.vondra              402 GIC         115 :         x = -1;
  379 tomas.vondra              403 CBC         175 :         while ((x = bms_next_member(idattrs, x)) >= 0)
  379 tomas.vondra              404 ECB             :         {
  379 tomas.vondra              405 GIC          96 :             AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
  379 tomas.vondra              406 ECB             : 
                                407                 :             /*
                                408                 :              * If pubviaroot is true, we are validating the column list of the
                                409                 :              * parent table, but the bitmap contains the replica identity
                                410                 :              * information of the child table. The parent/child attnums may
                                411                 :              * not match, so translate them to the parent - get the attname
                                412                 :              * from the child, and look it up in the parent.
                                413                 :              */
  379 tomas.vondra              414 GIC          96 :             if (pubviaroot)
  379 tomas.vondra              415 ECB             :             {
                                416                 :                 /* attribute name in the child table */
  332 tgl                       417 GIC          40 :                 char       *colname = get_attname(relid, attnum, false);
  379 tomas.vondra              418 ECB             : 
                                419                 :                 /*
                                420                 :                  * Determine the attnum for the attribute name in parent (we
                                421                 :                  * are using the column list defined on the parent).
                                422                 :                  */
  379 tomas.vondra              423 GIC          40 :                 attnum = get_attnum(publish_as_relid, colname);
  379 tomas.vondra              424 ECB             :             }
                                425                 : 
                                426                 :             /* replica identity column, not covered by the column list */
  379 tomas.vondra              427 GIC          96 :             if (!bms_is_member(attnum, columns))
  379 tomas.vondra              428 ECB             :             {
  379 tomas.vondra              429 GIC          36 :                 result = true;
  379 tomas.vondra              430 CBC          36 :                 break;
  379 tomas.vondra              431 ECB             :             }
                                432                 :         }
                                433                 : 
  379 tomas.vondra              434 GIC         115 :         bms_free(idattrs);
  379 tomas.vondra              435 CBC         115 :         bms_free(columns);
  379 tomas.vondra              436 ECB             :     }
                                437                 : 
  379 tomas.vondra              438 GIC         289 :     ReleaseSysCache(tuple);
  379 tomas.vondra              439 ECB             : 
  379 tomas.vondra              440 GIC         289 :     return result;
  379 tomas.vondra              441 ECB             : }
                                442                 : 
                                443                 : /* check_functions_in_node callback */
                                444                 : static bool
  411 akapila                   445 GIC         198 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
  411 akapila                   446 ECB             : {
  411 akapila                   447 GIC         198 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
  411 akapila                   448 ECB             :             func_id >= FirstNormalObjectId);
                                449                 : }
                                450                 : 
                                451                 : /*
                                452                 :  * The row filter walker checks if the row filter expression is a "simple
                                453                 :  * expression".
                                454                 :  *
                                455                 :  * It allows only simple or compound expressions such as:
                                456                 :  * - (Var Op Const)
                                457                 :  * - (Var Op Var)
                                458                 :  * - (Var Op Const) AND/OR (Var Op Const)
                                459                 :  * - etc
                                460                 :  * (where Var is a column of the table this filter belongs to)
                                461                 :  *
                                462                 :  * The simple expression has the following restrictions:
                                463                 :  * - User-defined operators are not allowed;
                                464                 :  * - User-defined functions are not allowed;
                                465                 :  * - User-defined types are not allowed;
                                466                 :  * - User-defined collations are not allowed;
                                467                 :  * - Non-immutable built-in functions are not allowed;
                                468                 :  * - System columns are not allowed.
                                469                 :  *
                                470                 :  * NOTES
                                471                 :  *
                                472                 :  * We don't allow user-defined functions/operators/types/collations because
                                473                 :  * (a) if a user drops a user-defined object used in a row filter expression or
                                474                 :  * if there is any other error while using it, the logical decoding
                                475                 :  * infrastructure won't be able to recover from such an error even if the
                                476                 :  * object is recreated again because a historic snapshot is used to evaluate
                                477                 :  * the row filter;
                                478                 :  * (b) a user-defined function can be used to access tables that could have
                                479                 :  * unpleasant results because a historic snapshot is used. That's why only
                                480                 :  * immutable built-in functions are allowed in row filter expressions.
                                481                 :  *
                                482                 :  * We don't allow system columns because currently, we don't have that
                                483                 :  * information in the tuple passed to downstream. Also, as we don't replicate
                                484                 :  * those to subscribers, there doesn't seem to be a need for a filter on those
                                485                 :  * columns.
                                486                 :  *
                                487                 :  * We can allow other node types after more analysis and testing.
                                488                 :  */
                                489                 : static bool
  411 akapila                   490 GIC         664 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
  411 akapila                   491 ECB             : {
  411 akapila                   492 GIC         664 :     char       *errdetail_msg = NULL;
  411 akapila                   493 ECB             : 
  411 akapila                   494 GIC         664 :     if (node == NULL)
  411 akapila                   495 CBC           3 :         return false;
  411 akapila                   496 ECB             : 
  411 akapila                   497 GIC         661 :     switch (nodeTag(node))
  411 akapila                   498 ECB             :     {
  411 akapila                   499 GIC         178 :         case T_Var:
  411 akapila                   500 ECB             :             /* System columns are not allowed. */
  411 akapila                   501 GIC         178 :             if (((Var *) node)->varattno < InvalidAttrNumber)
  411 akapila                   502 CBC           3 :                 errdetail_msg = _("System columns are not allowed.");
                                503             178 :             break;
                                504             176 :         case T_OpExpr:
  411 akapila                   505 ECB             :         case T_DistinctExpr:
                                506                 :         case T_NullIfExpr:
                                507                 :             /* OK, except user-defined operators are not allowed. */
  411 akapila                   508 GIC         176 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
  411 akapila                   509 CBC           3 :                 errdetail_msg = _("User-defined operators are not allowed.");
                                510             176 :             break;
                                511               3 :         case T_ScalarArrayOpExpr:
  411 akapila                   512 ECB             :             /* OK, except user-defined operators are not allowed. */
  411 akapila                   513 GIC           3 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
  411 akapila                   514 LBC           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
  411 akapila                   515 EUB             : 
                                516                 :             /*
                                517                 :              * We don't need to check the hashfuncid and negfuncid of
                                518                 :              * ScalarArrayOpExpr as those functions are only built for a
                                519                 :              * subquery.
                                520                 :              */
  411 akapila                   521 GIC           3 :             break;
  411 akapila                   522 CBC           3 :         case T_RowCompareExpr:
  411 akapila                   523 ECB             :             {
                                524                 :                 ListCell   *opid;
                                525                 : 
                                526                 :                 /* OK, except user-defined operators are not allowed. */
  411 akapila                   527 GIC           9 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
  411 akapila                   528 ECB             :                 {
  411 akapila                   529 GIC           6 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
  411 akapila                   530 ECB             :                     {
  411 akapila                   531 UIC           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
  411 akapila                   532 UBC           0 :                         break;
  411 akapila                   533 EUB             :                     }
                                534                 :                 }
                                535                 :             }
  411 akapila                   536 GIC           3 :             break;
  411 akapila                   537 CBC         298 :         case T_Const:
  411 akapila                   538 ECB             :         case T_FuncExpr:
                                539                 :         case T_BoolExpr:
                                540                 :         case T_RelabelType:
                                541                 :         case T_CollateExpr:
                                542                 :         case T_CaseExpr:
                                543                 :         case T_CaseTestExpr:
                                544                 :         case T_ArrayExpr:
                                545                 :         case T_RowExpr:
                                546                 :         case T_CoalesceExpr:
                                547                 :         case T_MinMaxExpr:
                                548                 :         case T_XmlExpr:
                                549                 :         case T_NullTest:
                                550                 :         case T_BooleanTest:
                                551                 :         case T_List:
                                552                 :             /* OK, supported */
  411 akapila                   553 GIC         298 :             break;
  411 akapila                   554 CBC           3 :         default:
  197 peter                     555               3 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
  411 akapila                   556               3 :             break;
  411 akapila                   557 ECB             :     }
                                558                 : 
                                559                 :     /*
                                560                 :      * For all the supported nodes, if we haven't already found a problem,
                                561                 :      * check the types, functions, and collations used in it.  We check List
                                562                 :      * by walking through each element.
                                563                 :      */
  193 alvherre                  564 GIC         661 :     if (!errdetail_msg && !IsA(node, List))
  193 alvherre                  565 ECB             :     {
  193 alvherre                  566 GIC         625 :         if (exprType(node) >= FirstNormalObjectId)
  193 alvherre                  567 CBC           3 :             errdetail_msg = _("User-defined types are not allowed.");
                                568             622 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
  193 alvherre                  569 ECB             :                                          (void *) pstate))
  193 alvherre                  570 GIC           6 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
  193 alvherre                  571 CBC        1232 :         else if (exprCollation(node) >= FirstNormalObjectId ||
                                572             616 :                  exprInputCollation(node) >= FirstNormalObjectId)
                                573               3 :             errdetail_msg = _("User-defined collations are not allowed.");
  193 alvherre                  574 ECB             :     }
                                575                 : 
                                576                 :     /*
                                577                 :      * If we found a problem in this node, throw error now. Otherwise keep
                                578                 :      * going.
                                579                 :      */
  411 akapila                   580 GIC         661 :     if (errdetail_msg)
  411 akapila                   581 CBC          21 :         ereport(ERROR,
  411 akapila                   582 ECB             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                583                 :                  errmsg("invalid publication WHERE expression"),
                                584                 :                  errdetail_internal("%s", errdetail_msg),
                                585                 :                  parser_errposition(pstate, exprLocation(node))));
                                586                 : 
  411 akapila                   587 GIC         640 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
  411 akapila                   588 ECB             :                                   (void *) pstate);
                                589                 : }
                                590                 : 
                                591                 : /*
                                592                 :  * Check if the row filter expression is a "simple expression".
                                593                 :  *
                                594                 :  * See check_simple_rowfilter_expr_walker for details.
                                595                 :  */
                                596                 : static bool
  411 akapila                   597 GIC         172 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
  411 akapila                   598 ECB             : {
  411 akapila                   599 GIC         172 :     return check_simple_rowfilter_expr_walker(node, pstate);
  411 akapila                   600 ECB             : }
                                601                 : 
                                602                 : /*
                                603                 :  * Transform the publication WHERE expression for all the relations in the list,
                                604                 :  * ensuring it is coerced to boolean and necessary collation information is
                                605                 :  * added if required, and add a new nsitem/RTE for the associated relation to
                                606                 :  * the ParseState's namespace list.
                                607                 :  *
                                608                 :  * Also check the publication row filter expression and throw an error if
                                609                 :  * anything not permitted or unexpected is encountered.
                                610                 :  */
                                611                 : static void
  411 akapila                   612 GIC         511 : TransformPubWhereClauses(List *tables, const char *queryString,
  411 akapila                   613 ECB             :                          bool pubviaroot)
                                614                 : {
                                615                 :     ListCell   *lc;
                                616                 : 
  411 akapila                   617 GIC        1022 :     foreach(lc, tables)
  411 akapila                   618 ECB             :     {
                                619                 :         ParseNamespaceItem *nsitem;
  411 akapila                   620 GIC         541 :         Node       *whereclause = NULL;
  411 akapila                   621 ECB             :         ParseState *pstate;
  411 akapila                   622 GIC         541 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
  411 akapila                   623 ECB             : 
  411 akapila                   624 GIC         541 :         if (pri->whereClause == NULL)
  411 akapila                   625 CBC         360 :             continue;
  411 akapila                   626 ECB             : 
                                627                 :         /*
                                628                 :          * If the publication doesn't publish changes via the root partitioned
                                629                 :          * table, the partition's row filter will be used. So disallow using
                                630                 :          * WHERE clause on partitioned table in this case.
                                631                 :          */
  411 akapila                   632 GIC         181 :         if (!pubviaroot &&
  411 akapila                   633 CBC         170 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
                                634               3 :             ereport(ERROR,
  411 akapila                   635 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                636                 :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
                                637                 :                             RelationGetRelationName(pri->relation)),
                                638                 :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
                                639                 :                                "publish_via_partition_root")));
                                640                 : 
                                641                 :         /*
                                642                 :          * A fresh pstate is required so that we only have "this" table in its
                                643                 :          * rangetable
                                644                 :          */
  411 akapila                   645 GIC         178 :         pstate = make_parsestate(NULL);
  411 akapila                   646 CBC         178 :         pstate->p_sourcetext = queryString;
                                647             178 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
  411 akapila                   648 ECB             :                                                AccessShareLock, NULL,
                                649                 :                                                false, false);
  411 akapila                   650 GIC         178 :         addNSItemToQuery(pstate, nsitem, false, true, true);
  411 akapila                   651 ECB             : 
  411 akapila                   652 GIC         178 :         whereclause = transformWhereClause(pstate,
  411 akapila                   653 CBC         178 :                                            copyObject(pri->whereClause),
  411 akapila                   654 ECB             :                                            EXPR_KIND_WHERE,
                                655                 :                                            "PUBLICATION WHERE");
                                656                 : 
                                657                 :         /* Fix up collation information */
  411 akapila                   658 GIC         172 :         assign_expr_collations(pstate, whereclause);
  411 akapila                   659 ECB             : 
                                660                 :         /*
                                661                 :          * We allow only simple expressions in row filters. See
                                662                 :          * check_simple_rowfilter_expr_walker.
                                663                 :          */
  411 akapila                   664 GIC         172 :         check_simple_rowfilter_expr(whereclause, pstate);
  411 akapila                   665 ECB             : 
  411 akapila                   666 GIC         151 :         free_parsestate(pstate);
  411 akapila                   667 ECB             : 
  411 akapila                   668 GIC         151 :         pri->whereClause = whereclause;
  411 akapila                   669 ECB             :     }
  411 akapila                   670 GIC         481 : }
  411 akapila                   671 ECB             : 
                                672                 : 
                                673                 : /*
                                674                 :  * Given a list of tables that are going to be added to a publication,
                                675                 :  * verify that they fulfill the necessary preconditions, namely: no tables
                                676                 :  * have a column list if any schema is published; and partitioned tables do
                                677                 :  * not have column lists if publish_via_partition_root is not set.
                                678                 :  *
                                679                 :  * 'publish_schema' indicates that the publication contains any TABLES IN
                                680                 :  * SCHEMA elements (newly added in this command, or preexisting).
                                681                 :  * 'pubviaroot' is the value of publish_via_partition_root.
                                682                 :  */
                                683                 : static void
  194 alvherre                  684 GIC         481 : CheckPubRelationColumnList(char *pubname, List *tables,
  198 akapila                   685 ECB             :                            bool publish_schema, bool pubviaroot)
                                686                 : {
                                687                 :     ListCell   *lc;
                                688                 : 
  379 tomas.vondra              689 GIC         977 :     foreach(lc, tables)
  379 tomas.vondra              690 ECB             :     {
  379 tomas.vondra              691 GIC         511 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
  379 tomas.vondra              692 ECB             : 
  379 tomas.vondra              693 GIC         511 :         if (pri->columns == NIL)
  379 tomas.vondra              694 CBC         348 :             continue;
  379 tomas.vondra              695 ECB             : 
                                696                 :         /*
                                697                 :          * Disallow specifying column list if any schema is in the
                                698                 :          * publication.
                                699                 :          *
                                700                 :          * XXX We could instead just forbid the case when the publication
                                701                 :          * tries to publish the table with a column list and a schema for that
                                702                 :          * table. However, if we do that then we need a restriction during
                                703                 :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
                                704                 :          * seem to be a good idea.
                                705                 :          */
  198 akapila                   706 GIC         163 :         if (publish_schema)
  198 akapila                   707 CBC          12 :             ereport(ERROR,
  198 akapila                   708 ECB             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                709                 :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
                                710                 :                            get_namespace_name(RelationGetNamespace(pri->relation)),
                                711                 :                            RelationGetRelationName(pri->relation), pubname),
                                712                 :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
                                713                 : 
                                714                 :         /*
                                715                 :          * If the publication doesn't publish changes via the root partitioned
                                716                 :          * table, the partition's column list will be used. So disallow using
                                717                 :          * a column list on the partitioned table in this case.
                                718                 :          */
  379 tomas.vondra              719 GIC         151 :         if (!pubviaroot &&
  379 tomas.vondra              720 CBC         113 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
                                721               3 :             ereport(ERROR,
  379 tomas.vondra              722 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                723                 :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
                                724                 :                             get_namespace_name(RelationGetNamespace(pri->relation)),
                                725                 :                             RelationGetRelationName(pri->relation), pubname),
                                726                 :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
                                727                 :                                "publish_via_partition_root")));
                                728                 :     }
  379 tomas.vondra              729 GIC         466 : }
  379 tomas.vondra              730 ECB             : 
                                731                 : /*
                                732                 :  * Create new publication.
                                733                 :  */
                                734                 : ObjectAddress
  633 dean.a.rasheed            735 GIC         325 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 2271 peter_e                   736 ECB             : {
                                737                 :     Relation    rel;
                                738                 :     ObjectAddress myself;
                                739                 :     Oid         puboid;
                                740                 :     bool        nulls[Natts_pg_publication];
                                741                 :     Datum       values[Natts_pg_publication];
                                742                 :     HeapTuple   tup;
                                743                 :     bool        publish_given;
                                744                 :     PublicationActions pubactions;
                                745                 :     bool        publish_via_partition_root_given;
                                746                 :     bool        publish_via_partition_root;
                                747                 :     AclResult   aclresult;
  367 tomas.vondra              748 GIC         325 :     List       *relations = NIL;
  367 tomas.vondra              749 CBC         325 :     List       *schemaidlist = NIL;
  381 tomas.vondra              750 ECB             : 
                                751                 :     /* must have CREATE privilege on database */
  147 peter                     752 GNC         325 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
 2271 peter_e                   753 CBC         325 :     if (aclresult != ACLCHECK_OK)
 1954                           754               3 :         aclcheck_error(aclresult, OBJECT_DATABASE,
 2271                           755               3 :                        get_database_name(MyDatabaseId));
 2271 peter_e                   756 ECB             : 
                                757                 :     /* FOR ALL TABLES requires superuser */
  367 tomas.vondra              758 GIC         322 :     if (stmt->for_all_tables && !superuser())
 2271 peter_e                   759 LBC           0 :         ereport(ERROR,
 2271 peter_e                   760 EUB             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                761                 :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
                                762                 : 
 1539 andres                    763 GIC         322 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
 2271 peter_e                   764 ECB             : 
                                765                 :     /* Check if name is used */
 1601 andres                    766 GIC         322 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
 1601 andres                    767 ECB             :                              CStringGetDatum(stmt->pubname));
 2271 peter_e                   768 GIC         322 :     if (OidIsValid(puboid))
 2271 peter_e                   769 CBC           3 :         ereport(ERROR,
 2271 peter_e                   770 ECB             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                                771                 :                  errmsg("publication \"%s\" already exists",
                                772                 :                         stmt->pubname)));
                                773                 : 
                                774                 :     /* Form a tuple. */
 2271 peter_e                   775 GIC         319 :     memset(values, 0, sizeof(values));
 2271 peter_e                   776 CBC         319 :     memset(nulls, false, sizeof(nulls));
 2271 peter_e                   777 ECB             : 
 2271 peter_e                   778 GIC         319 :     values[Anum_pg_publication_pubname - 1] =
 2271 peter_e                   779 CBC         319 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
                                780             319 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
 2271 peter_e                   781 ECB             : 
  633 dean.a.rasheed            782 GIC         319 :     parse_publication_options(pstate,
  633 dean.a.rasheed            783 ECB             :                               stmt->options,
                                784                 :                               &publish_given, &pubactions,
                                785                 :                               &publish_via_partition_root_given,
                                786                 :                               &publish_via_partition_root);
                                787                 : 
 1601 andres                    788 GIC         310 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
 1601 andres                    789 ECB             :                                 Anum_pg_publication_oid);
 1601 andres                    790 GIC         310 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
 2271 peter_e                   791 CBC         310 :     values[Anum_pg_publication_puballtables - 1] =
  367 tomas.vondra              792             310 :         BoolGetDatum(stmt->for_all_tables);
 2271 peter_e                   793             310 :     values[Anum_pg_publication_pubinsert - 1] =
 1096 peter                     794             310 :         BoolGetDatum(pubactions.pubinsert);
 2271 peter_e                   795             310 :     values[Anum_pg_publication_pubupdate - 1] =
 1096 peter                     796             310 :         BoolGetDatum(pubactions.pubupdate);
 2271 peter_e                   797             310 :     values[Anum_pg_publication_pubdelete - 1] =
 1096 peter                     798             310 :         BoolGetDatum(pubactions.pubdelete);
 1828 peter_e                   799             310 :     values[Anum_pg_publication_pubtruncate - 1] =
 1096 peter                     800             310 :         BoolGetDatum(pubactions.pubtruncate);
                                801             310 :     values[Anum_pg_publication_pubviaroot - 1] =
                                802             310 :         BoolGetDatum(publish_via_partition_root);
 2271 peter_e                   803 ECB             : 
 2271 peter_e                   804 GIC         310 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 2271 peter_e                   805 ECB             : 
                                806                 :     /* Insert tuple into catalog. */
 1601 andres                    807 GIC         310 :     CatalogTupleInsert(rel, tup);
 2271 peter_e                   808 CBC         310 :     heap_freetuple(tup);
 2271 peter_e                   809 ECB             : 
 2270 alvherre                  810 GIC         310 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
 2270 alvherre                  811 ECB             : 
 2271 peter_e                   812 GIC         310 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
 2271 peter_e                   813 ECB             : 
                                814                 :     /* Make the changes visible. */
 2271 peter_e                   815 GIC         310 :     CommandCounterIncrement();
 2271 peter_e                   816 ECB             : 
                                817                 :     /* Associate objects with the publication. */
  367 tomas.vondra              818 GIC         310 :     if (stmt->for_all_tables)
  578 akapila                   819 ECB             :     {
                                820                 :         /* Invalidate relcache so that publication info is rebuilt. */
  578 akapila                   821 GIC          24 :         CacheInvalidateRelcacheAll();
  578 akapila                   822 ECB             :     }
                                823                 :     else
                                824                 :     {
  367 tomas.vondra              825 GIC         286 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
  367 tomas.vondra              826 ECB             :                                    &schemaidlist);
                                827                 : 
                                828                 :         /* FOR TABLES IN SCHEMA requires superuser */
  235 tgl                       829 GIC         277 :         if (schemaidlist != NIL && !superuser())
  529 akapila                   830 CBC           3 :             ereport(ERROR,
  529 akapila                   831 ECB             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                832                 :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
                                833                 : 
  235 tgl                       834 GNC         274 :         if (relations != NIL)
  529 akapila                   835 ECB             :         {
                                836                 :             List       *rels;
                                837                 : 
  367 tomas.vondra              838 GIC         181 :             rels = OpenTableList(relations);
  411 akapila                   839 CBC         169 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
  411 akapila                   840 ECB             :                                      publish_via_partition_root);
                                841                 : 
  194 alvherre                  842 GIC         157 :             CheckPubRelationColumnList(stmt->pubname, rels,
  198 akapila                   843 ECB             :                                        schemaidlist != NIL,
                                844                 :                                        publish_via_partition_root);
                                845                 : 
  367 tomas.vondra              846 GIC         154 :             PublicationAddTables(puboid, rels, true, NULL);
  367 tomas.vondra              847 CBC         141 :             CloseTableList(rels);
  381 tomas.vondra              848 ECB             :         }
                                849                 : 
  235 tgl                       850 GNC         234 :         if (schemaidlist != NIL)
  381 tomas.vondra              851 ECB             :         {
                                852                 :             /*
                                853                 :              * Schema lock is held until the publication is created to prevent
                                854                 :              * concurrent schema deletion.
                                855                 :              */
  367 tomas.vondra              856 GIC          67 :             LockSchemaList(schemaidlist);
  367 tomas.vondra              857 CBC          67 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
  529 akapila                   858 ECB             :         }
                                859                 :     }
                                860                 : 
 1539 andres                    861 GIC         255 :     table_close(rel, RowExclusiveLock);
 2271 peter_e                   862 ECB             : 
 2271 peter_e                   863 GIC         255 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
 2271 peter_e                   864 ECB             : 
 1366 tmunro                    865 GIC         255 :     if (wal_level != WAL_LEVEL_LOGICAL)
 1366 tmunro                    866 CBC         153 :         ereport(WARNING,
 1366 tmunro                    867 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                868                 :                  errmsg("wal_level is insufficient to publish logical changes"),
                                869                 :                  errhint("Set wal_level to \"logical\" before creating subscriptions.")));
                                870                 : 
 2271 peter_e                   871 GIC         255 :     return myself;
 2271 peter_e                   872 ECB             : }
                                873                 : 
                                874                 : /*
                                875                 :  * Change options of a publication.
                                876                 :  */
                                877                 : static void
  633 dean.a.rasheed            878 GIC          55 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
  633 dean.a.rasheed            879 ECB             :                         Relation rel, HeapTuple tup)
                                880                 : {
                                881                 :     bool        nulls[Natts_pg_publication];
                                882                 :     bool        replaces[Natts_pg_publication];
                                883                 :     Datum       values[Natts_pg_publication];
                                884                 :     bool        publish_given;
                                885                 :     PublicationActions pubactions;
                                886                 :     bool        publish_via_partition_root_given;
                                887                 :     bool        publish_via_partition_root;
                                888                 :     ObjectAddress obj;
                                889                 :     Form_pg_publication pubform;
  411 akapila                   890 GIC          55 :     List       *root_relids = NIL;
  411 akapila                   891 ECB             :     ListCell   *lc;
                                892                 : 
  633 dean.a.rasheed            893 GIC          55 :     parse_publication_options(pstate,
  633 dean.a.rasheed            894 ECB             :                               stmt->options,
                                895                 :                               &publish_given, &pubactions,
                                896                 :                               &publish_via_partition_root_given,
                                897                 :                               &publish_via_partition_root);
                                898                 : 
  411 akapila                   899 GIC          55 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
  411 akapila                   900 ECB             : 
                                901                 :     /*
                                902                 :      * If the publication doesn't publish changes via the root partitioned
                                903                 :      * table, the partition's row filter and column list will be used. So
                                904                 :      * disallow using WHERE clause and column lists on partitioned table in
                                905                 :      * this case.
                                906                 :      */
  411 akapila                   907 GIC          55 :     if (!pubform->puballtables && publish_via_partition_root_given &&
  411 akapila                   908 CBC          40 :         !publish_via_partition_root)
  411 akapila                   909 ECB             :     {
                                910                 :         /*
                                911                 :          * Lock the publication so nobody else can do anything with it. This
                                912                 :          * prevents concurrent alter to add partitioned table(s) with WHERE
                                913                 :          * clause(s) and/or column lists which we don't allow when not
                                914                 :          * publishing via root.
                                915                 :          */
  411 akapila                   916 GIC          24 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
  411 akapila                   917 ECB             :                            AccessShareLock);
                                918                 : 
  411 akapila                   919 GIC          24 :         root_relids = GetPublicationRelations(pubform->oid,
  411 akapila                   920 ECB             :                                               PUBLICATION_PART_ROOT);
                                921                 : 
  411 akapila                   922 GIC          42 :         foreach(lc, root_relids)
  411 akapila                   923 ECB             :         {
  411 akapila                   924 GIC          24 :             Oid         relid = lfirst_oid(lc);
  361 alvherre                  925 ECB             :             HeapTuple   rftuple;
                                926                 :             char        relkind;
                                927                 :             char       *relname;
                                928                 :             bool        has_rowfilter;
                                929                 :             bool        has_collist;
                                930                 : 
                                931                 :             /*
                                932                 :              * Beware: we don't have lock on the relations, so cope silently
                                933                 :              * with the cache lookups returning NULL.
                                934                 :              */
                                935                 : 
  411 akapila                   936 GIC          24 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
  411 akapila                   937 ECB             :                                       ObjectIdGetDatum(relid),
                                938                 :                                       ObjectIdGetDatum(pubform->oid));
  361 alvherre                  939 GIC          24 :             if (!HeapTupleIsValid(rftuple))
  361 alvherre                  940 LBC           0 :                 continue;
  361 alvherre                  941 GBC          24 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
  361 alvherre                  942 CBC          24 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
                                943              24 :             if (!has_rowfilter && !has_collist)
  411 akapila                   944 ECB             :             {
  361 alvherre                  945 GIC           6 :                 ReleaseSysCache(rftuple);
  361 alvherre                  946 CBC           6 :                 continue;
  361 alvherre                  947 ECB             :             }
                                948                 : 
  361 alvherre                  949 GIC          18 :             relkind = get_rel_relkind(relid);
  361 alvherre                  950 CBC          18 :             if (relkind != RELKIND_PARTITIONED_TABLE)
  361 alvherre                  951 ECB             :             {
  361 alvherre                  952 GIC          12 :                 ReleaseSysCache(rftuple);
  361 alvherre                  953 CBC          12 :                 continue;
  361 alvherre                  954 ECB             :             }
  361 alvherre                  955 GIC           6 :             relname = get_rel_name(relid);
  361 alvherre                  956 CBC           6 :             if (relname == NULL)    /* table concurrently dropped */
  361 alvherre                  957 ECB             :             {
  411 akapila                   958 UIC           0 :                 ReleaseSysCache(rftuple);
  361 alvherre                  959 UBC           0 :                 continue;
  411 akapila                   960 EUB             :             }
                                961                 : 
  361 alvherre                  962 GIC           6 :             if (has_rowfilter)
  361 alvherre                  963 CBC           3 :                 ereport(ERROR,
  361 alvherre                  964 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                965                 :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
                                966                 :                                 "publish_via_partition_root",
                                967                 :                                 stmt->pubname),
                                968                 :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
                                969                 :                                    relname, "publish_via_partition_root")));
  361 alvherre                  970 GIC           3 :             Assert(has_collist);
  361 alvherre                  971 CBC           3 :             ereport(ERROR,
  361 alvherre                  972 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                973                 :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
                                974                 :                             "publish_via_partition_root",
                                975                 :                             stmt->pubname),
                                976                 :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
                                977                 :                                relname, "publish_via_partition_root")));
                                978                 :         }
                                979                 :     }
                                980                 : 
                                981                 :     /* Everything ok, form a new tuple. */
 2271 peter_e                   982 GIC          49 :     memset(values, 0, sizeof(values));
 2271 peter_e                   983 CBC          49 :     memset(nulls, false, sizeof(nulls));
                                984              49 :     memset(replaces, false, sizeof(replaces));
 2271 peter_e                   985 ECB             : 
 2158 peter_e                   986 GIC          49 :     if (publish_given)
 2271 peter_e                   987 ECB             :     {
 1096 peter                     988 GIC          14 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
 2271 peter_e                   989 CBC          14 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
 2158 peter_e                   990 ECB             : 
 1096 peter                     991 GIC          14 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
 2271 peter_e                   992 CBC          14 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
 2158 peter_e                   993 ECB             : 
 1096 peter                     994 GIC          14 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
 2271 peter_e                   995 CBC          14 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
 1828 peter_e                   996 ECB             : 
 1096 peter                     997 GIC          14 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
 1828 peter_e                   998 CBC          14 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
 2271 peter_e                   999 ECB             :     }
                               1000                 : 
 1096 peter                    1001 GIC          49 :     if (publish_via_partition_root_given)
 1096 peter                    1002 ECB             :     {
 1096 peter                    1003 GIC          35 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
 1096 peter                    1004 CBC          35 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
 1096 peter                    1005 ECB             :     }
                               1006                 : 
 2271 peter_e                  1007 GIC          49 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 2271 peter_e                  1008 ECB             :                             replaces);
                               1009                 : 
                               1010                 :     /* Update the catalog. */
 2259 alvherre                 1011 GIC          49 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
 2271 peter_e                  1012 ECB             : 
 2271 peter_e                  1013 GIC          49 :     CommandCounterIncrement();
 2271 peter_e                  1014 ECB             : 
 1601 andres                   1015 GIC          49 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
 1601 andres                   1016 ECB             : 
                               1017                 :     /* Invalidate the relcache. */
  367 tomas.vondra             1018 GIC          49 :     if (pubform->puballtables)
 2271 peter_e                  1019 ECB             :     {
 2271 peter_e                  1020 GIC           4 :         CacheInvalidateRelcacheAll();
 2271 peter_e                  1021 ECB             :     }
                               1022                 :     else
                               1023                 :     {
  529 akapila                  1024 GIC          45 :         List       *relids = NIL;
  529 akapila                  1025 CBC          45 :         List       *schemarelids = NIL;
  529 akapila                  1026 ECB             : 
                               1027                 :         /*
                               1028                 :          * For any partitioned tables contained in the publication, we must
                               1029                 :          * invalidate all partitions contained in the respective partition
                               1030                 :          * trees, not just those explicitly mentioned in the publication.
                               1031                 :          */
  411 akapila                  1032 GIC          45 :         if (root_relids == NIL)
  411 akapila                  1033 CBC          27 :             relids = GetPublicationRelations(pubform->oid,
  411 akapila                  1034 ECB             :                                              PUBLICATION_PART_ALL);
                               1035                 :         else
                               1036                 :         {
                               1037                 :             /*
                               1038                 :              * We already got tables explicitly mentioned in the publication.
                               1039                 :              * Now get all partitions for the partitioned table in the list.
                               1040                 :              */
  411 akapila                  1041 GIC          36 :             foreach(lc, root_relids)
  411 akapila                  1042 CBC          18 :                 relids = GetPubPartitionOptionRelations(relids,
  411 akapila                  1043 ECB             :                                                         PUBLICATION_PART_ALL,
                               1044                 :                                                         lfirst_oid(lc));
                               1045                 :         }
                               1046                 : 
  529 akapila                  1047 GIC          45 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
  529 akapila                  1048 ECB             :                                                         PUBLICATION_PART_ALL);
  529 akapila                  1049 GIC          45 :         relids = list_concat_unique_oid(relids, schemarelids);
 2271 peter_e                  1050 ECB             : 
  564 akapila                  1051 GIC          45 :         InvalidatePublicationRels(relids);
 2271 peter_e                  1052 ECB             :     }
                               1053                 : 
 1601 andres                   1054 GIC          49 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
 2271 peter_e                  1055 CBC          49 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
 2271 peter_e                  1056 ECB             :                                      (Node *) stmt);
                               1057                 : 
 1601 andres                   1058 GIC          49 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
 2271 peter_e                  1059 CBC          49 : }
 2271 peter_e                  1060 ECB             : 
                               1061                 : /*
                               1062                 :  * Invalidate the relations.
                               1063                 :  */
                               1064                 : void
  564 akapila                  1065 GIC        1071 : InvalidatePublicationRels(List *relids)
  564 akapila                  1066 ECB             : {
                               1067                 :     /*
                               1068                 :      * We don't want to send too many individual messages, at some point it's
                               1069                 :      * cheaper to just reset whole relcache.
                               1070                 :      */
  564 akapila                  1071 GIC        1071 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
  564 akapila                  1072 ECB             :     {
                               1073                 :         ListCell   *lc;
                               1074                 : 
  564 akapila                  1075 GIC        9421 :         foreach(lc, relids)
  564 akapila                  1076 CBC        8350 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
  564 akapila                  1077 ECB             :     }
                               1078                 :     else
  564 akapila                  1079 UIC           0 :         CacheInvalidateRelcacheAll();
  564 akapila                  1080 GBC        1071 : }
  564 akapila                  1081 ECB             : 
                               1082                 : /*
                               1083                 :  * Add or remove table to/from publication.
                               1084                 :  */
                               1085                 : static void
  529 akapila                  1086 GIC         421 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
  198 akapila                  1087 ECB             :                        List *tables, const char *queryString,
                               1088                 :                        bool publish_schema)
                               1089                 : {
 2271 peter_e                  1090 GIC         421 :     List       *rels = NIL;
 2271 peter_e                  1091 CBC         421 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 1601 andres                   1092             421 :     Oid         pubid = pubform->oid;
 2271 peter_e                  1093 ECB             : 
                               1094                 :     /*
                               1095                 :      * Nothing to do if no objects, except in SET: for that it is quite
                               1096                 :      * possible that user has not specified any tables in which case we need
                               1097                 :      * to remove all the existing tables.
                               1098                 :      */
  461 alvherre                 1099 GIC         421 :     if (!tables && stmt->action != AP_SetObjects)
  529 akapila                  1100 CBC          33 :         return;
 2271 peter_e                  1101 ECB             : 
  367 tomas.vondra             1102 GIC         388 :     rels = OpenTableList(tables);
 2271 peter_e                  1103 ECB             : 
  461 alvherre                 1104 GIC         388 :     if (stmt->action == AP_AddObjects)
  529 akapila                  1105 ECB             :     {
  411 akapila                  1106 GIC         131 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
  411 akapila                  1107 ECB             : 
  198 akapila                  1108 GIC         122 :         publish_schema |= is_schema_publication(pubid);
  198 akapila                  1109 ECB             : 
  194 alvherre                 1110 GIC         122 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
  198 akapila                  1111 CBC         122 :                                    pubform->pubviaroot);
  379 tomas.vondra             1112 ECB             : 
  367 tomas.vondra             1113 GIC         116 :         PublicationAddTables(pubid, rels, false, stmt);
  529 akapila                  1114 ECB             :     }
  461 alvherre                 1115 GIC         257 :     else if (stmt->action == AP_DropObjects)
  367 tomas.vondra             1116 CBC          46 :         PublicationDropTables(pubid, rels, false);
  461 alvherre                 1117 ECB             :     else                        /* AP_SetObjects */
                               1118                 :     {
 1060 tgl                      1119 GIC         211 :         List       *oldrelids = GetPublicationRelations(pubid,
 1060 tgl                      1120 ECB             :                                                         PUBLICATION_PART_ROOT);
 2271 peter_e                  1121 GIC         211 :         List       *delrels = NIL;
 2271 peter_e                  1122 ECB             :         ListCell   *oldlc;
                               1123                 : 
  411 akapila                  1124 GIC         211 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
  411 akapila                  1125 ECB             : 
  194 alvherre                 1126 GIC         202 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
  198 akapila                  1127 CBC         202 :                                    pubform->pubviaroot);
  379 tomas.vondra             1128 ECB             : 
                               1129                 :         /*
                               1130                 :          * To recreate the relation list for the publication, look for
                               1131                 :          * existing relations that do not need to be dropped.
                               1132                 :          */
 2271 peter_e                  1133 GIC         385 :         foreach(oldlc, oldrelids)
 2271 peter_e                  1134 ECB             :         {
 2271 peter_e                  1135 GIC         189 :             Oid         oldrelid = lfirst_oid(oldlc);
 2271 peter_e                  1136 ECB             :             ListCell   *newlc;
                               1137                 :             PublicationRelInfo *oldrel;
 2271 peter_e                  1138 GIC         189 :             bool        found = false;
  411 akapila                  1139 ECB             :             HeapTuple   rftuple;
  411 akapila                  1140 GIC         189 :             Node       *oldrelwhereclause = NULL;
  379 tomas.vondra             1141 CBC         189 :             Bitmapset  *oldcolumns = NULL;
  411 akapila                  1142 ECB             : 
                               1143                 :             /* look up the cache for the old relmap */
  411 akapila                  1144 GIC         189 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
  411 akapila                  1145 ECB             :                                       ObjectIdGetDatum(oldrelid),
                               1146                 :                                       ObjectIdGetDatum(pubid));
                               1147                 : 
                               1148                 :             /*
                               1149                 :              * See if the existing relation currently has a WHERE clause or a
                               1150                 :              * column list. We need to compare those too.
                               1151                 :              */
  411 akapila                  1152 GIC         189 :             if (HeapTupleIsValid(rftuple))
  411 akapila                  1153 ECB             :             {
  379 tomas.vondra             1154 GIC         189 :                 bool        isnull = true;
  411 akapila                  1155 ECB             :                 Datum       whereClauseDatum;
                               1156                 :                 Datum       columnListDatum;
                               1157                 : 
                               1158                 :                 /* Load the WHERE clause for this table. */
  411 akapila                  1159 GIC         189 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
  411 akapila                  1160 ECB             :                                                    Anum_pg_publication_rel_prqual,
                               1161                 :                                                    &isnull);
  379 tomas.vondra             1162 GIC         189 :                 if (!isnull)
  411 akapila                  1163 CBC         105 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
  411 akapila                  1164 ECB             : 
                               1165                 :                 /* Transform the int2vector column list to a bitmap. */
  379 tomas.vondra             1166 GIC         189 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
  332 tgl                      1167 ECB             :                                                   Anum_pg_publication_rel_prattrs,
                               1168                 :                                                   &isnull);
                               1169                 : 
  379 tomas.vondra             1170 GIC         189 :                 if (!isnull)
  379 tomas.vondra             1171 CBC          62 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
  379 tomas.vondra             1172 ECB             : 
  411 akapila                  1173 GIC         189 :                 ReleaseSysCache(rftuple);
  411 akapila                  1174 ECB             :             }
                               1175                 : 
 2271 peter_e                  1176 GIC         371 :             foreach(newlc, rels)
 2271 peter_e                  1177 ECB             :             {
                               1178                 :                 PublicationRelInfo *newpubrel;
                               1179                 :                 Oid         newrelid;
  332 tgl                      1180 GIC         190 :                 Bitmapset  *newcolumns = NULL;
 2271 peter_e                  1181 ECB             : 
  580 alvherre                 1182 GIC         190 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
  379 tomas.vondra             1183 CBC         190 :                 newrelid = RelationGetRelid(newpubrel->relation);
  379 tomas.vondra             1184 ECB             : 
                               1185                 :                 /*
                               1186                 :                  * If the new publication has column list, transform it to a
                               1187                 :                  * bitmap too.
                               1188                 :                  */
  379 tomas.vondra             1189 GIC         190 :                 if (newpubrel->columns)
  379 tomas.vondra             1190 ECB             :                 {
                               1191                 :                     ListCell   *lc;
                               1192                 : 
  379 tomas.vondra             1193 GIC         177 :                     foreach(lc, newpubrel->columns)
  379 tomas.vondra             1194 ECB             :                     {
  379 tomas.vondra             1195 GIC         109 :                         char       *colname = strVal(lfirst(lc));
  379 tomas.vondra             1196 CBC         109 :                         AttrNumber  attnum = get_attnum(newrelid, colname);
  379 tomas.vondra             1197 ECB             : 
  379 tomas.vondra             1198 GIC         109 :                         newcolumns = bms_add_member(newcolumns, attnum);
  379 tomas.vondra             1199 ECB             :                     }
                               1200                 :                 }
                               1201                 : 
                               1202                 :                 /*
                               1203                 :                  * Check if any of the new set of relations matches with the
                               1204                 :                  * existing relations in the publication. Additionally, if the
                               1205                 :                  * relation has an associated WHERE clause, check the WHERE
                               1206                 :                  * expressions also match. Same for the column list. Drop the
                               1207                 :                  * rest.
                               1208                 :                  */
  580 alvherre                 1209 GIC         190 :                 if (RelationGetRelid(newpubrel->relation) == oldrelid)
 2271 peter_e                  1210 ECB             :                 {
  379 tomas.vondra             1211 GIC         130 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
  379 tomas.vondra             1212 CBC          34 :                         bms_equal(oldcolumns, newcolumns))
  411 akapila                  1213 ECB             :                     {
  411 akapila                  1214 GIC           8 :                         found = true;
  411 akapila                  1215 CBC           8 :                         break;
  411 akapila                  1216 ECB             :                     }
                               1217                 :                 }
                               1218                 :             }
                               1219                 : 
                               1220                 :             /*
                               1221                 :              * Add the non-matched relations to a list so that they can be
                               1222                 :              * dropped.
                               1223                 :              */
  411 akapila                  1224 GIC         189 :             if (!found)
  411 akapila                  1225 ECB             :             {
  411 akapila                  1226 GIC         181 :                 oldrel = palloc(sizeof(PublicationRelInfo));
  411 akapila                  1227 CBC         181 :                 oldrel->whereClause = NULL;
  379 tomas.vondra             1228             181 :                 oldrel->columns = NIL;
  411 akapila                  1229             181 :                 oldrel->relation = table_open(oldrelid,
  411 akapila                  1230 ECB             :                                               ShareUpdateExclusiveLock);
  411 akapila                  1231 GIC         181 :                 delrels = lappend(delrels, oldrel);
 2271 peter_e                  1232 ECB             :             }
                               1233                 :         }
                               1234                 : 
                               1235                 :         /* And drop them. */
  367 tomas.vondra             1236 GIC         196 :         PublicationDropTables(pubid, delrels, true);
 2271 peter_e                  1237 ECB             : 
                               1238                 :         /*
                               1239                 :          * Don't bother calculating the difference for adding, we'll catch and
                               1240                 :          * skip existing ones when doing catalog update.
                               1241                 :          */
  367 tomas.vondra             1242 GIC         196 :         PublicationAddTables(pubid, rels, true, stmt);
 2271 peter_e                  1243 ECB             : 
  367 tomas.vondra             1244 GIC         196 :         CloseTableList(delrels);
 2271 peter_e                  1245 ECB             :     }
                               1246                 : 
  367 tomas.vondra             1247 GIC         331 :     CloseTableList(rels);
 2271 peter_e                  1248 ECB             : }
                               1249                 : 
                               1250                 : /*
                               1251                 :  * Alter the publication schemas.
                               1252                 :  *
                               1253                 :  * Add or remove schemas to/from publication.
                               1254                 :  */
                               1255                 : static void
  529 akapila                  1256 GIC         364 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
  367 tomas.vondra             1257 ECB             :                         HeapTuple tup, List *schemaidlist)
                               1258                 : {
  529 akapila                  1259 GIC         364 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
  529 akapila                  1260 ECB             : 
                               1261                 :     /*
                               1262                 :      * Nothing to do if no objects, except in SET: for that it is quite
                               1263                 :      * possible that user has not specified any schemas in which case we need
                               1264                 :      * to remove all the existing schemas.
                               1265                 :      */
  461 alvherre                 1266 GIC         364 :     if (!schemaidlist && stmt->action != AP_SetObjects)
  529 akapila                  1267 CBC         135 :         return;
  529 akapila                  1268 ECB             : 
                               1269                 :     /*
                               1270                 :      * Schema lock is held until the publication is altered to prevent
                               1271                 :      * concurrent schema deletion.
                               1272                 :      */
  529 akapila                  1273 GIC         229 :     LockSchemaList(schemaidlist);
  461 alvherre                 1274 CBC         229 :     if (stmt->action == AP_AddObjects)
  529 akapila                  1275 ECB             :     {
                               1276                 :         ListCell   *lc;
                               1277                 :         List       *reloids;
                               1278                 : 
  367 tomas.vondra             1279 GIC          14 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
  529 akapila                  1280 ECB             : 
  198 akapila                  1281 GIC          18 :         foreach(lc, reloids)
  198 akapila                  1282 ECB             :         {
                               1283                 :             HeapTuple   coltuple;
                               1284                 : 
  198 akapila                  1285 GIC           7 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
  198 akapila                  1286 ECB             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
                               1287                 :                                        ObjectIdGetDatum(pubform->oid));
                               1288                 : 
  198 akapila                  1289 GIC           7 :             if (!HeapTupleIsValid(coltuple))
  198 akapila                  1290 LBC           0 :                 continue;
  198 akapila                  1291 EUB             : 
                               1292                 :             /*
                               1293                 :              * Disallow adding schema if column list is already part of the
                               1294                 :              * publication. See CheckPubRelationColumnList.
                               1295                 :              */
  198 akapila                  1296 GIC           7 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
  198 akapila                  1297 CBC           3 :                 ereport(ERROR,
  198 akapila                  1298 ECB             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               1299                 :                         errmsg("cannot add schema to publication \"%s\"",
                               1300                 :                                stmt->pubname),
                               1301                 :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
                               1302                 : 
  198 akapila                  1303 GIC           4 :             ReleaseSysCache(coltuple);
  198 akapila                  1304 ECB             :         }
                               1305                 : 
  367 tomas.vondra             1306 GIC          11 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
  529 akapila                  1307 ECB             :     }
  461 alvherre                 1308 GIC         215 :     else if (stmt->action == AP_DropObjects)
  367 tomas.vondra             1309 CBC          19 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
  461 alvherre                 1310 ECB             :     else                        /* AP_SetObjects */
                               1311                 :     {
  367 tomas.vondra             1312 GIC         196 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
  529 akapila                  1313 CBC         196 :         List       *delschemas = NIL;
  529 akapila                  1314 ECB             : 
                               1315                 :         /* Identify which schemas should be dropped */
  529 akapila                  1316 GIC         196 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
  529 akapila                  1317 ECB             : 
                               1318                 :         /*
                               1319                 :          * Schema lock is held until the publication is altered to prevent
                               1320                 :          * concurrent schema deletion.
                               1321                 :          */
  529 akapila                  1322 GIC         196 :         LockSchemaList(delschemas);
  529 akapila                  1323 ECB             : 
                               1324                 :         /* And drop them */
  367 tomas.vondra             1325 GIC         196 :         PublicationDropSchemas(pubform->oid, delschemas, true);
  529 akapila                  1326 ECB             : 
                               1327                 :         /*
                               1328                 :          * Don't bother calculating the difference for adding, we'll catch and
                               1329                 :          * skip existing ones when doing catalog update.
                               1330                 :          */
  367 tomas.vondra             1331 GIC         196 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
  529 akapila                  1332 ECB             :     }
                               1333                 : }
                               1334                 : 
                               1335                 : /*
                               1336                 :  * Check if relations and schemas can be in a given publication and throw
                               1337                 :  * appropriate error if not.
                               1338                 :  */
                               1339                 : static void
  529 akapila                  1340 GIC         442 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
  367 tomas.vondra             1341 ECB             :                       List *tables, List *schemaidlist)
                               1342                 : {
  529 akapila                  1343 GIC         442 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
  529 akapila                  1344 ECB             : 
  461 alvherre                 1345 GIC         442 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
  367 tomas.vondra             1346 CBC          47 :         schemaidlist && !superuser())
  529 akapila                  1347               3 :         ereport(ERROR,
  529 akapila                  1348 ECB             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                               1349                 :                  errmsg("must be superuser to add or set schemas")));
                               1350                 : 
                               1351                 :     /*
                               1352                 :      * Check that user is allowed to manipulate the publication tables in
                               1353                 :      * schema
                               1354                 :      */
  367 tomas.vondra             1355 GIC         439 :     if (schemaidlist && pubform->puballtables)
  529 akapila                  1356 CBC           9 :         ereport(ERROR,
  529 akapila                  1357 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1358                 :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
                               1359                 :                         NameStr(pubform->pubname)),
                               1360                 :                  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
                               1361                 : 
                               1362                 :     /* Check that user is allowed to manipulate the publication tables. */
  529 akapila                  1363 GIC         430 :     if (tables && pubform->puballtables)
  529 akapila                  1364 CBC           9 :         ereport(ERROR,
  529 akapila                  1365 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1366                 :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
                               1367                 :                         NameStr(pubform->pubname)),
                               1368                 :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
  529 akapila                  1369 GIC         421 : }
  529 akapila                  1370 ECB             : 
                               1371                 : /*
                               1372                 :  * Alter the existing publication.
                               1373                 :  *
                               1374                 :  * This is dispatcher function for AlterPublicationOptions,
                               1375                 :  * AlterPublicationSchemas and AlterPublicationTables.
                               1376                 :  */
                               1377                 : void
  633 dean.a.rasheed           1378 GIC         506 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 2271 peter_e                  1379 ECB             : {
                               1380                 :     Relation    rel;
                               1381                 :     HeapTuple   tup;
                               1382                 :     Form_pg_publication pubform;
                               1383                 : 
 1539 andres                   1384 GIC         506 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
 2271 peter_e                  1385 ECB             : 
 2271 peter_e                  1386 GIC         506 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
 2271 peter_e                  1387 ECB             :                               CStringGetDatum(stmt->pubname));
                               1388                 : 
 2271 peter_e                  1389 GIC         506 :     if (!HeapTupleIsValid(tup))
 2271 peter_e                  1390 LBC           0 :         ereport(ERROR,
 2271 peter_e                  1391 EUB             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                               1392                 :                  errmsg("publication \"%s\" does not exist",
                               1393                 :                         stmt->pubname)));
                               1394                 : 
 1601 andres                   1395 GIC         506 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
 1601 andres                   1396 ECB             : 
                               1397                 :     /* must be owner */
  147 peter                    1398 GNC         506 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
 1954 peter_e                  1399 LBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
 2271 peter_e                  1400 UBC           0 :                        stmt->pubname);
 2271 peter_e                  1401 EUB             : 
 2271 peter_e                  1402 GIC         506 :     if (stmt->options)
  633 dean.a.rasheed           1403 CBC          55 :         AlterPublicationOptions(pstate, stmt, rel, tup);
 2271 peter_e                  1404 ECB             :     else
                               1405                 :     {
  367 tomas.vondra             1406 GIC         451 :         List       *relations = NIL;
  367 tomas.vondra             1407 CBC         451 :         List       *schemaidlist = NIL;
  411 akapila                  1408             451 :         Oid         pubid = pubform->oid;
  529 akapila                  1409 ECB             : 
  367 tomas.vondra             1410 GIC         451 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
  367 tomas.vondra             1411 ECB             :                                    &schemaidlist);
                               1412                 : 
  367 tomas.vondra             1413 GIC         442 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
  529 akapila                  1414 ECB             : 
  411 akapila                  1415 GIC         421 :         heap_freetuple(tup);
  411 akapila                  1416 ECB             : 
                               1417                 :         /* Lock the publication so nobody else can do anything with it. */
  411 akapila                  1418 GIC         421 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
  529 akapila                  1419 ECB             :                            AccessExclusiveLock);
                               1420                 : 
                               1421                 :         /*
                               1422                 :          * It is possible that by the time we acquire the lock on publication,
                               1423                 :          * concurrent DDL has removed it. We can test this by checking the
                               1424                 :          * existence of publication. We get the tuple again to avoid the risk
                               1425                 :          * of any publication option getting changed.
                               1426                 :          */
  411 akapila                  1427 GIC         421 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
  411 akapila                  1428 CBC         421 :         if (!HeapTupleIsValid(tup))
  529 akapila                  1429 LBC           0 :             ereport(ERROR,
  529 akapila                  1430 EUB             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
                               1431                 :                     errmsg("publication \"%s\" does not exist",
                               1432                 :                            stmt->pubname));
                               1433                 : 
  198 akapila                  1434 GIC         421 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
  198 akapila                  1435 ECB             :                                schemaidlist != NIL);
  367 tomas.vondra             1436 GIC         364 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
  529 akapila                  1437 ECB             :     }
                               1438                 : 
                               1439                 :     /* Cleanup. */
 2271 peter_e                  1440 GIC         404 :     heap_freetuple(tup);
 1539 andres                   1441 CBC         404 :     table_close(rel, RowExclusiveLock);
 2271 peter_e                  1442             404 : }
 2271 peter_e                  1443 ECB             : 
                               1444                 : /*
                               1445                 :  * Remove relation from publication by mapping OID.
                               1446                 :  */
                               1447                 : void
 2271 peter_e                  1448 GIC         374 : RemovePublicationRelById(Oid proid)
 2271 peter_e                  1449 ECB             : {
                               1450                 :     Relation    rel;
                               1451                 :     HeapTuple   tup;
                               1452                 :     Form_pg_publication_rel pubrel;
  564 akapila                  1453 GIC         374 :     List       *relids = NIL;
 2271 peter_e                  1454 ECB             : 
 1539 andres                   1455 GIC         374 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 2271 peter_e                  1456 ECB             : 
 2271 peter_e                  1457 GIC         374 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
 2271 peter_e                  1458 ECB             : 
 2271 peter_e                  1459 GIC         374 :     if (!HeapTupleIsValid(tup))
 2271 peter_e                  1460 LBC           0 :         elog(ERROR, "cache lookup failed for publication table %u",
 2271 peter_e                  1461 EUB             :              proid);
                               1462                 : 
 2271 peter_e                  1463 GIC         374 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
 2271 peter_e                  1464 ECB             : 
                               1465                 :     /*
                               1466                 :      * Invalidate relcache so that publication info is rebuilt.
                               1467                 :      *
                               1468                 :      * For the partitioned tables, we must invalidate all partitions contained
                               1469                 :      * in the respective partition hierarchies, not just the one explicitly
                               1470                 :      * mentioned in the publication. This is required because we implicitly
                               1471                 :      * publish the child tables when the parent table is published.
                               1472                 :      */
  564 akapila                  1473 GIC         374 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
  564 akapila                  1474 ECB             :                                             pubrel->prrelid);
                               1475                 : 
  564 akapila                  1476 GIC         374 :     InvalidatePublicationRels(relids);
 2271 peter_e                  1477 ECB             : 
 2258 tgl                      1478 GIC         374 :     CatalogTupleDelete(rel, &tup->t_self);
 2271 peter_e                  1479 ECB             : 
 2271 peter_e                  1480 GIC         374 :     ReleaseSysCache(tup);
 2271 peter_e                  1481 ECB             : 
 1539 andres                   1482 GIC         374 :     table_close(rel, RowExclusiveLock);
 2271 peter_e                  1483 CBC         374 : }
 2271 peter_e                  1484 ECB             : 
                               1485                 : /*
                               1486                 :  * Remove the publication by mapping OID.
                               1487                 :  */
                               1488                 : void
  578 akapila                  1489 GIC         178 : RemovePublicationById(Oid pubid)
  578 akapila                  1490 ECB             : {
                               1491                 :     Relation    rel;
                               1492                 :     HeapTuple   tup;
                               1493                 :     Form_pg_publication pubform;
                               1494                 : 
  578 akapila                  1495 GIC         178 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
  578 akapila                  1496 ECB             : 
  578 akapila                  1497 GIC         178 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
  578 akapila                  1498 CBC         178 :     if (!HeapTupleIsValid(tup))
  578 akapila                  1499 LBC           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
  578 akapila                  1500 EUB             : 
  465 alvherre                 1501 GIC         178 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
  578 akapila                  1502 ECB             : 
                               1503                 :     /* Invalidate relcache so that publication info is rebuilt. */
  367 tomas.vondra             1504 GIC         178 :     if (pubform->puballtables)
  578 akapila                  1505 CBC          10 :         CacheInvalidateRelcacheAll();
  578 akapila                  1506 ECB             : 
  578 akapila                  1507 GIC         178 :     CatalogTupleDelete(rel, &tup->t_self);
  578 akapila                  1508 ECB             : 
  578 akapila                  1509 GIC         178 :     ReleaseSysCache(tup);
  578 akapila                  1510 ECB             : 
  578 akapila                  1511 GIC         178 :     table_close(rel, RowExclusiveLock);
  578 akapila                  1512 CBC         178 : }
  578 akapila                  1513 ECB             : 
                               1514                 : /*
                               1515                 :  * Remove schema from publication by mapping OID.
                               1516                 :  */
                               1517                 : void
  529 akapila                  1518 GIC          96 : RemovePublicationSchemaById(Oid psoid)
  529 akapila                  1519 ECB             : {
                               1520                 :     Relation    rel;
                               1521                 :     HeapTuple   tup;
  529 akapila                  1522 GIC          96 :     List       *schemaRels = NIL;
  529 akapila                  1523 ECB             :     Form_pg_publication_namespace pubsch;
                               1524                 : 
  529 akapila                  1525 GIC          96 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
  529 akapila                  1526 ECB             : 
  529 akapila                  1527 GIC          96 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
  529 akapila                  1528 ECB             : 
  529 akapila                  1529 GIC          96 :     if (!HeapTupleIsValid(tup))
  529 akapila                  1530 LBC           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
  529 akapila                  1531 EUB             : 
  529 akapila                  1532 GIC          96 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
  529 akapila                  1533 ECB             : 
                               1534                 :     /*
                               1535                 :      * Invalidate relcache so that publication info is rebuilt. See
                               1536                 :      * RemovePublicationRelById for why we need to consider all the
                               1537                 :      * partitions.
                               1538                 :      */
  529 akapila                  1539 GIC          96 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
  529 akapila                  1540 ECB             :                                                PUBLICATION_PART_ALL);
  529 akapila                  1541 GIC          96 :     InvalidatePublicationRels(schemaRels);
  529 akapila                  1542 ECB             : 
  529 akapila                  1543 GIC          96 :     CatalogTupleDelete(rel, &tup->t_self);
  529 akapila                  1544 ECB             : 
  529 akapila                  1545 GIC          96 :     ReleaseSysCache(tup);
  529 akapila                  1546 ECB             : 
  529 akapila                  1547 GIC          96 :     table_close(rel, RowExclusiveLock);
  529 akapila                  1548 CBC          96 : }
  529 akapila                  1549 ECB             : 
                               1550                 : /*
                               1551                 :  * Open relations specified by a PublicationTable list.
                               1552                 :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
                               1553                 :  * add them to a publication.
                               1554                 :  */
                               1555                 : static List *
  367 tomas.vondra             1556 GIC         569 : OpenTableList(List *tables)
 2271 peter_e                  1557 ECB             : {
 2271 peter_e                  1558 GIC         569 :     List       *relids = NIL;
  367 tomas.vondra             1559 CBC         569 :     List       *rels = NIL;
 2271 peter_e                  1560 ECB             :     ListCell   *lc;
  411 akapila                  1561 GIC         569 :     List       *relids_with_rf = NIL;
  379 tomas.vondra             1562 CBC         569 :     List       *relids_with_collist = NIL;
 2271 peter_e                  1563 ECB             : 
                               1564                 :     /*
                               1565                 :      * Open, share-lock, and check all the explicitly-specified relations
                               1566                 :      */
  367 tomas.vondra             1567 GIC        1167 :     foreach(lc, tables)
 2271 peter_e                  1568 ECB             :     {
  580 alvherre                 1569 GIC         610 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
  580 alvherre                 1570 CBC         610 :         bool        recurse = t->relation->inh;
 1584 tgl                      1571 ECB             :         Relation    rel;
                               1572                 :         Oid         myrelid;
                               1573                 :         PublicationRelInfo *pub_rel;
                               1574                 : 
                               1575                 :         /* Allow query cancel in case this takes a long time */
 2271 peter_e                  1576 GIC         610 :         CHECK_FOR_INTERRUPTS();
 2271 peter_e                  1577 ECB             : 
  580 alvherre                 1578 GIC         610 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
 2271 peter_e                  1579 CBC         610 :         myrelid = RelationGetRelid(rel);
 2154 tgl                      1580 ECB             : 
                               1581                 :         /*
                               1582                 :          * Filter out duplicates if user specifies "foo, foo".
                               1583                 :          *
                               1584                 :          * Note that this algorithm is known to not be very efficient (O(N^2))
                               1585                 :          * but given that it only works on list of tables given to us by user
                               1586                 :          * it's deemed acceptable.
                               1587                 :          */
 2271 peter_e                  1588 GIC         610 :         if (list_member_oid(relids, myrelid))
 2271 peter_e                  1589 ECB             :         {
                               1590                 :             /* Disallow duplicate tables if there are any with row filters. */
  411 akapila                  1591 GIC          12 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
  411 akapila                  1592 CBC           6 :                 ereport(ERROR,
  411 akapila                  1593 ECB             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1594                 :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
                               1595                 :                                 RelationGetRelationName(rel))));
                               1596                 : 
                               1597                 :             /* Disallow duplicate tables if there are any with column lists. */
  379 tomas.vondra             1598 GIC           6 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
  379 tomas.vondra             1599 CBC           6 :                 ereport(ERROR,
  379 tomas.vondra             1600 ECB             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1601                 :                          errmsg("conflicting or redundant column lists for table \"%s\"",
                               1602                 :                                 RelationGetRelationName(rel))));
                               1603                 : 
 1539 andres                   1604 UIC           0 :             table_close(rel, ShareUpdateExclusiveLock);
 2271 peter_e                  1605 UBC           0 :             continue;
 2271 peter_e                  1606 EUB             :         }
                               1607                 : 
  580 alvherre                 1608 GIC         598 :         pub_rel = palloc(sizeof(PublicationRelInfo));
  580 alvherre                 1609 CBC         598 :         pub_rel->relation = rel;
  411 akapila                  1610             598 :         pub_rel->whereClause = t->whereClause;
  379 tomas.vondra             1611             598 :         pub_rel->columns = t->columns;
  367                          1612             598 :         rels = lappend(rels, pub_rel);
 2271 peter_e                  1613             598 :         relids = lappend_oid(relids, myrelid);
 2271 peter_e                  1614 ECB             : 
  411 akapila                  1615 GIC         598 :         if (t->whereClause)
  411 akapila                  1616 CBC         186 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
  411 akapila                  1617 ECB             : 
  379 tomas.vondra             1618 GIC         598 :         if (t->columns)
  379 tomas.vondra             1619 CBC         166 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
  379 tomas.vondra             1620 ECB             : 
                               1621                 :         /*
                               1622                 :          * Add children of this rel, if requested, so that they too are added
                               1623                 :          * to the publication.  A partitioned table can't have any inheritance
                               1624                 :          * children other than its partitions, which need not be explicitly
                               1625                 :          * added to the publication.
                               1626                 :          */
 1125 peter                    1627 GIC         598 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 2271 peter_e                  1628 ECB             :         {
                               1629                 :             List       *children;
                               1630                 :             ListCell   *child;
                               1631                 : 
 2271 peter_e                  1632 GIC         507 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
 2271 peter_e                  1633 ECB             :                                            NULL);
                               1634                 : 
 2271 peter_e                  1635 GIC        1018 :             foreach(child, children)
 2271 peter_e                  1636 ECB             :             {
 2271 peter_e                  1637 GIC         511 :                 Oid         childrelid = lfirst_oid(child);
 2271 peter_e                  1638 ECB             : 
                               1639                 :                 /* Allow query cancel in case this takes a long time */
 1584 tgl                      1640 GIC         511 :                 CHECK_FOR_INTERRUPTS();
 2271 peter_e                  1641 ECB             : 
                               1642                 :                 /*
                               1643                 :                  * Skip duplicates if user specified both parent and child
                               1644                 :                  * tables.
                               1645                 :                  */
 2271 peter_e                  1646 GIC         511 :                 if (list_member_oid(relids, childrelid))
  411 akapila                  1647 ECB             :                 {
                               1648                 :                     /*
                               1649                 :                      * We don't allow to specify row filter for both parent
                               1650                 :                      * and child table at the same time as it is not very
                               1651                 :                      * clear which one should be given preference.
                               1652                 :                      */
  411 akapila                  1653 GIC         507 :                     if (childrelid != myrelid &&
  411 akapila                  1654 LBC           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
  411 akapila                  1655 UBC           0 :                         ereport(ERROR,
  411 akapila                  1656 EUB             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1657                 :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
                               1658                 :                                         RelationGetRelationName(rel))));
                               1659                 : 
                               1660                 :                     /*
                               1661                 :                      * We don't allow to specify column list for both parent
                               1662                 :                      * and child table at the same time as it is not very
                               1663                 :                      * clear which one should be given preference.
                               1664                 :                      */
  379 tomas.vondra             1665 GIC         507 :                     if (childrelid != myrelid &&
  379 tomas.vondra             1666 LBC           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
  379 tomas.vondra             1667 UBC           0 :                         ereport(ERROR,
  379 tomas.vondra             1668 EUB             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1669                 :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
                               1670                 :                                         RelationGetRelationName(rel))));
                               1671                 : 
 2271 peter_e                  1672 GIC         507 :                     continue;
  411 akapila                  1673 ECB             :                 }
                               1674                 : 
                               1675                 :                 /* find_all_inheritors already got lock */
 1539 andres                   1676 GIC           4 :                 rel = table_open(childrelid, NoLock);
  580 alvherre                 1677 CBC           4 :                 pub_rel = palloc(sizeof(PublicationRelInfo));
                               1678               4 :                 pub_rel->relation = rel;
  411 akapila                  1679 ECB             :                 /* child inherits WHERE clause from parent */
  411 akapila                  1680 GIC           4 :                 pub_rel->whereClause = t->whereClause;
  367 tomas.vondra             1681 ECB             : 
                               1682                 :                 /* child inherits column list from parent */
  379 tomas.vondra             1683 GIC           4 :                 pub_rel->columns = t->columns;
  367 tomas.vondra             1684 CBC           4 :                 rels = lappend(rels, pub_rel);
 2271 peter_e                  1685               4 :                 relids = lappend_oid(relids, childrelid);
  411 akapila                  1686 ECB             : 
  411 akapila                  1687 GIC           4 :                 if (t->whereClause)
  411 akapila                  1688 CBC           1 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
  379 tomas.vondra             1689 ECB             : 
  379 tomas.vondra             1690 GIC           4 :                 if (t->columns)
  379 tomas.vondra             1691 LBC           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
 2271 peter_e                  1692 EUB             :             }
                               1693                 :         }
                               1694                 :     }
                               1695                 : 
 2271 peter_e                  1696 GIC         557 :     list_free(relids);
  411 akapila                  1697 CBC         557 :     list_free(relids_with_rf);
 2271 peter_e                  1698 ECB             : 
  367 tomas.vondra             1699 GIC         557 :     return rels;
 2271 peter_e                  1700 ECB             : }
                               1701                 : 
                               1702                 : /*
                               1703                 :  * Close all relations in the list.
                               1704                 :  */
                               1705                 : static void
  367 tomas.vondra             1706 GIC         668 : CloseTableList(List *rels)
 2271 peter_e                  1707 ECB             : {
                               1708                 :     ListCell   *lc;
                               1709                 : 
 2271 peter_e                  1710 GIC        1354 :     foreach(lc, rels)
 2271 peter_e                  1711 ECB             :     {
                               1712                 :         PublicationRelInfo *pub_rel;
                               1713                 : 
  580 alvherre                 1714 GIC         686 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
  580 alvherre                 1715 CBC         686 :         table_close(pub_rel->relation, NoLock);
 2271 peter_e                  1716 ECB             :     }
                               1717                 : 
  411 akapila                  1718 GIC         668 :     list_free_deep(rels);
 2271 peter_e                  1719 CBC         668 : }
 2271 peter_e                  1720 ECB             : 
                               1721                 : /*
                               1722                 :  * Lock the schemas specified in the schema list in AccessShareLock mode in
                               1723                 :  * order to prevent concurrent schema deletion.
                               1724                 :  */
                               1725                 : static void
  529 akapila                  1726 GIC         492 : LockSchemaList(List *schemalist)
  529 akapila                  1727 ECB             : {
                               1728                 :     ListCell   *lc;
                               1729                 : 
  529 akapila                  1730 GIC         634 :     foreach(lc, schemalist)
  529 akapila                  1731 ECB             :     {
  529 akapila                  1732 GIC         142 :         Oid         schemaid = lfirst_oid(lc);
  529 akapila                  1733 ECB             : 
                               1734                 :         /* Allow query cancel in case this takes a long time */
  529 akapila                  1735 GIC         142 :         CHECK_FOR_INTERRUPTS();
  529 akapila                  1736 CBC         142 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
  529 akapila                  1737 ECB             : 
                               1738                 :         /*
                               1739                 :          * It is possible that by the time we acquire the lock on schema,
                               1740                 :          * concurrent DDL has removed it. We can test this by checking the
                               1741                 :          * existence of schema.
                               1742                 :          */
  529 akapila                  1743 GIC         142 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
  529 akapila                  1744 LBC           0 :             ereport(ERROR,
  529 akapila                  1745 EUB             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
                               1746                 :                     errmsg("schema with OID %u does not exist", schemaid));
                               1747                 :     }
  529 akapila                  1748 GIC         492 : }
  529 akapila                  1749 ECB             : 
                               1750                 : /*
                               1751                 :  * Add listed tables to the publication.
                               1752                 :  */
                               1753                 : static void
  367 tomas.vondra             1754 GIC         466 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 2271 peter_e                  1755 ECB             :                      AlterPublicationStmt *stmt)
                               1756                 : {
                               1757                 :     ListCell   *lc;
                               1758                 : 
  367 tomas.vondra             1759 GIC         466 :     Assert(!stmt || !stmt->for_all_tables);
 2271 peter_e                  1760 ECB             : 
 2271 peter_e                  1761 GIC         931 :     foreach(lc, rels)
 2271 peter_e                  1762 ECB             :     {
  580 alvherre                 1763 GIC         496 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
  580 alvherre                 1764 CBC         496 :         Relation    rel = pub_rel->relation;
 2153 bruce                    1765 ECB             :         ObjectAddress obj;
                               1766                 : 
                               1767                 :         /* Must be owner of the table or superuser. */
  147 peter                    1768 GNC         496 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
 1954 peter_e                  1769 CBC           3 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
 2271                          1770               3 :                            RelationGetRelationName(rel));
 2271 peter_e                  1771 ECB             : 
  580 alvherre                 1772 GIC         493 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
 2271 peter_e                  1773 CBC         465 :         if (stmt)
 2271 peter_e                  1774 ECB             :         {
 2271 peter_e                  1775 GIC         293 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
 2271 peter_e                  1776 ECB             :                                              (Node *) stmt);
                               1777                 : 
 2271 peter_e                  1778 GIC         293 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
 2271 peter_e                  1779 ECB             :                                        obj.objectId, 0);
                               1780                 :         }
                               1781                 :     }
 2271 peter_e                  1782 GIC         435 : }
 2271 peter_e                  1783 ECB             : 
                               1784                 : /*
                               1785                 :  * Remove listed tables from the publication.
                               1786                 :  */
                               1787                 : static void
  367 tomas.vondra             1788 GIC         242 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 2271 peter_e                  1789 ECB             : {
                               1790                 :     ObjectAddress obj;
                               1791                 :     ListCell   *lc;
                               1792                 :     Oid         prid;
                               1793                 : 
 2271 peter_e                  1794 GIC         463 :     foreach(lc, rels)
 2271 peter_e                  1795 ECB             :     {
  580 alvherre                 1796 GIC         230 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
  580 alvherre                 1797 CBC         230 :         Relation    rel = pubrel->relation;
 2271 peter_e                  1798             230 :         Oid         relid = RelationGetRelid(rel);
 2271 peter_e                  1799 ECB             : 
  379 tomas.vondra             1800 GIC         230 :         if (pubrel->columns)
  379 tomas.vondra             1801 LBC           0 :             ereport(ERROR,
  379 tomas.vondra             1802 EUB             :                     errcode(ERRCODE_SYNTAX_ERROR),
                               1803                 :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
                               1804                 : 
 1601 andres                   1805 GIC         230 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 1601 andres                   1806 ECB             :                                ObjectIdGetDatum(relid),
                               1807                 :                                ObjectIdGetDatum(pubid));
 2271 peter_e                  1808 GIC         230 :         if (!OidIsValid(prid))
 2271 peter_e                  1809 ECB             :         {
 2271 peter_e                  1810 GIC           6 :             if (missing_ok)
 2271 peter_e                  1811 LBC           0 :                 continue;
 2271 peter_e                  1812 EUB             : 
 2271 peter_e                  1813 GIC           6 :             ereport(ERROR,
 2271 peter_e                  1814 ECB             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
                               1815                 :                      errmsg("relation \"%s\" is not part of the publication",
                               1816                 :                             RelationGetRelationName(rel))));
                               1817                 :         }
                               1818                 : 
  411 akapila                  1819 GIC         224 :         if (pubrel->whereClause)
  411 akapila                  1820 CBC           3 :             ereport(ERROR,
  411 akapila                  1821 ECB             :                     (errcode(ERRCODE_SYNTAX_ERROR),
                               1822                 :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
                               1823                 : 
 2271 peter_e                  1824 GIC         221 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
 2271 peter_e                  1825 CBC         221 :         performDeletion(&obj, DROP_CASCADE, 0);
 2271 peter_e                  1826 ECB             :     }
 2271 peter_e                  1827 GIC         233 : }
 2271 peter_e                  1828 ECB             : 
                               1829                 : /*
                               1830                 :  * Add listed schemas to the publication.
                               1831                 :  */
                               1832                 : static void
  367 tomas.vondra             1833 GIC         274 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
  367 tomas.vondra             1834 ECB             :                       AlterPublicationStmt *stmt)
                               1835                 : {
                               1836                 :     ListCell   *lc;
                               1837                 : 
  367 tomas.vondra             1838 GIC         274 :     Assert(!stmt || !stmt->for_all_tables);
  529 akapila                  1839 ECB             : 
  529 akapila                  1840 GIC         379 :     foreach(lc, schemas)
  529 akapila                  1841 ECB             :     {
  529 akapila                  1842 GIC         111 :         Oid         schemaid = lfirst_oid(lc);
  529 akapila                  1843 ECB             :         ObjectAddress obj;
                               1844                 : 
  367 tomas.vondra             1845 GIC         111 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
  529 akapila                  1846 CBC         105 :         if (stmt)
  529 akapila                  1847 ECB             :         {
  529 akapila                  1848 GIC          29 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
  529 akapila                  1849 ECB             :                                              (Node *) stmt);
                               1850                 : 
  529 akapila                  1851 GIC          29 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
  529 akapila                  1852 ECB             :                                        obj.objectId, 0);
                               1853                 :         }
                               1854                 :     }
  529 akapila                  1855 GIC         268 : }
  529 akapila                  1856 ECB             : 
                               1857                 : /*
                               1858                 :  * Remove listed schemas from the publication.
                               1859                 :  */
                               1860                 : static void
  367 tomas.vondra             1861 GIC         215 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
  529 akapila                  1862 ECB             : {
                               1863                 :     ObjectAddress obj;
                               1864                 :     ListCell   *lc;
                               1865                 :     Oid         psid;
                               1866                 : 
  529 akapila                  1867 GIC         240 :     foreach(lc, schemas)
  529 akapila                  1868 ECB             :     {
  529 akapila                  1869 GIC          28 :         Oid         schemaid = lfirst_oid(lc);
  529 akapila                  1870 ECB             : 
  367 tomas.vondra             1871 GIC          28 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
  529 akapila                  1872 ECB             :                                Anum_pg_publication_namespace_oid,
                               1873                 :                                ObjectIdGetDatum(schemaid),
                               1874                 :                                ObjectIdGetDatum(pubid));
  529 akapila                  1875 GIC          28 :         if (!OidIsValid(psid))
  529 akapila                  1876 ECB             :         {
  529 akapila                  1877 GIC           3 :             if (missing_ok)
  529 akapila                  1878 LBC           0 :                 continue;
  529 akapila                  1879 EUB             : 
  529 akapila                  1880 GIC           3 :             ereport(ERROR,
  529 akapila                  1881 ECB             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
                               1882                 :                      errmsg("tables from schema \"%s\" are not part of the publication",
                               1883                 :                             get_namespace_name(schemaid))));
                               1884                 :         }
                               1885                 : 
  529 akapila                  1886 GIC          25 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
  529 akapila                  1887 CBC          25 :         performDeletion(&obj, DROP_CASCADE, 0);
  529 akapila                  1888 ECB             :     }
  529 akapila                  1889 GIC         212 : }
  529 akapila                  1890 ECB             : 
                               1891                 : /*
                               1892                 :  * Internal workhorse for changing a publication owner
                               1893                 :  */
                               1894                 : static void
 2271 peter_e                  1895 GIC          12 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 2271 peter_e                  1896 ECB             : {
                               1897                 :     Form_pg_publication form;
                               1898                 : 
 2271 peter_e                  1899 GIC          12 :     form = (Form_pg_publication) GETSTRUCT(tup);
 2271 peter_e                  1900 ECB             : 
 2271 peter_e                  1901 GIC          12 :     if (form->pubowner == newOwnerId)
 2271 peter_e                  1902 LBC           0 :         return;
 2271 peter_e                  1903 EUB             : 
 2246 peter_e                  1904 GIC          12 :     if (!superuser())
 2246 peter_e                  1905 ECB             :     {
                               1906                 :         AclResult   aclresult;
                               1907                 : 
                               1908                 :         /* Must be owner */
  147 peter                    1909 GNC           6 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
 1954 peter_e                  1910 LBC           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
 2246 peter_e                  1911 UBC           0 :                            NameStr(form->pubname));
 2246 peter_e                  1912 EUB             : 
                               1913                 :         /* Must be able to become new owner */
  142 rhaas                    1914 GNC           6 :         check_can_set_role(GetUserId(), newOwnerId);
 2246 peter_e                  1915 ECB             : 
                               1916                 :         /* New owner must have CREATE privilege on database */
  147 peter                    1917 GNC           6 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
 2246 peter_e                  1918 CBC           6 :         if (aclresult != ACLCHECK_OK)
 1954 peter_e                  1919 LBC           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
 2246 peter_e                  1920 UBC           0 :                            get_database_name(MyDatabaseId));
 2246 peter_e                  1921 EUB             : 
 2246 peter_e                  1922 GIC           6 :         if (form->puballtables && !superuser_arg(newOwnerId))
 2246 peter_e                  1923 LBC           0 :             ereport(ERROR,
 2246 peter_e                  1924 EUB             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                               1925                 :                      errmsg("permission denied to change owner of publication \"%s\"",
                               1926                 :                             NameStr(form->pubname)),
                               1927                 :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
                               1928                 : 
  487 akapila                  1929 GIC           6 :         if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
  487 akapila                  1930 CBC           3 :             ereport(ERROR,
  487 akapila                  1931 ECB             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                               1932                 :                      errmsg("permission denied to change owner of publication \"%s\"",
                               1933                 :                             NameStr(form->pubname)),
                               1934                 :                      errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
                               1935                 :     }
                               1936                 : 
 2271 peter_e                  1937 GIC           9 :     form->pubowner = newOwnerId;
 2259 alvherre                 1938 CBC           9 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
 2271 peter_e                  1939 ECB             : 
                               1940                 :     /* Update owner dependency reference */
 2271 peter_e                  1941 GIC           9 :     changeDependencyOnOwner(PublicationRelationId,
 1601 andres                   1942 ECB             :                             form->oid,
                               1943                 :                             newOwnerId);
                               1944                 : 
 2271 peter_e                  1945 GIC           9 :     InvokeObjectPostAlterHook(PublicationRelationId,
 1601 andres                   1946 ECB             :                               form->oid, 0);
                               1947                 : }
                               1948                 : 
                               1949                 : /*
                               1950                 :  * Change publication owner -- by name
                               1951                 :  */
                               1952                 : ObjectAddress
 2271 peter_e                  1953 GIC          12 : AlterPublicationOwner(const char *name, Oid newOwnerId)
 2271 peter_e                  1954 ECB             : {
                               1955                 :     Oid         subid;
                               1956                 :     HeapTuple   tup;
                               1957                 :     Relation    rel;
                               1958                 :     ObjectAddress address;
                               1959                 :     Form_pg_publication pubform;
                               1960                 : 
 1539 andres                   1961 GIC          12 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
 2271 peter_e                  1962 ECB             : 
 2271 peter_e                  1963 GIC          12 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
 2271 peter_e                  1964 ECB             : 
 2271 peter_e                  1965 GIC          12 :     if (!HeapTupleIsValid(tup))
 2271 peter_e                  1966 LBC           0 :         ereport(ERROR,
 2271 peter_e                  1967 EUB             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                               1968                 :                  errmsg("publication \"%s\" does not exist", name)));
                               1969                 : 
 1601 andres                   1970 GIC          12 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
 1601 andres                   1971 CBC          12 :     subid = pubform->oid;
 2271 peter_e                  1972 ECB             : 
 2271 peter_e                  1973 GIC          12 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
 2271 peter_e                  1974 ECB             : 
 2271 peter_e                  1975 GIC           9 :     ObjectAddressSet(address, PublicationRelationId, subid);
 2271 peter_e                  1976 ECB             : 
 2271 peter_e                  1977 GIC           9 :     heap_freetuple(tup);
 2271 peter_e                  1978 ECB             : 
 1539 andres                   1979 GIC           9 :     table_close(rel, RowExclusiveLock);
 2271 peter_e                  1980 ECB             : 
 2271 peter_e                  1981 GIC           9 :     return address;
 2271 peter_e                  1982 ECB             : }
                               1983                 : 
                               1984                 : /*
                               1985                 :  * Change publication owner -- by OID
                               1986                 :  */
                               1987                 : void
 2271 peter_e                  1988 UIC           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
 2271 peter_e                  1989 EUB             : {
                               1990                 :     HeapTuple   tup;
                               1991                 :     Relation    rel;
                               1992                 : 
 1539 andres                   1993 UIC           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
 2271 peter_e                  1994 EUB             : 
 2271 peter_e                  1995 UIC           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
 2271 peter_e                  1996 EUB             : 
 2271 peter_e                  1997 UIC           0 :     if (!HeapTupleIsValid(tup))
 2271 peter_e                  1998 UBC           0 :         ereport(ERROR,
 2271 peter_e                  1999 EUB             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                               2000                 :                  errmsg("publication with OID %u does not exist", subid)));
                               2001                 : 
 2271 peter_e                  2002 UIC           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
 2271 peter_e                  2003 EUB             : 
 2271 peter_e                  2004 UIC           0 :     heap_freetuple(tup);
 2271 peter_e                  2005 EUB             : 
 1539 andres                   2006 UIC           0 :     table_close(rel, RowExclusiveLock);
 2271 peter_e                  2007 UBC           0 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a