Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/htup_details.h"
19 : #include "access/table.h"
20 : #include "access/xact.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/namespace.h"
24 : #include "catalog/objectaccess.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/partition.h"
27 : #include "catalog/pg_database.h"
28 : #include "catalog/pg_inherits.h"
29 : #include "catalog/pg_namespace.h"
30 : #include "catalog/pg_proc.h"
31 : #include "catalog/pg_publication.h"
32 : #include "catalog/pg_publication_namespace.h"
33 : #include "catalog/pg_publication_rel.h"
34 : #include "catalog/pg_type.h"
35 : #include "commands/dbcommands.h"
36 : #include "commands/defrem.h"
37 : #include "commands/event_trigger.h"
38 : #include "commands/publicationcmds.h"
39 : #include "funcapi.h"
40 : #include "miscadmin.h"
41 : #include "nodes/nodeFuncs.h"
42 : #include "parser/parse_clause.h"
43 : #include "parser/parse_collate.h"
44 : #include "parser/parse_relation.h"
45 : #include "storage/lmgr.h"
46 : #include "utils/acl.h"
47 : #include "utils/array.h"
48 : #include "utils/builtins.h"
49 : #include "utils/catcache.h"
50 : #include "utils/fmgroids.h"
51 : #include "utils/inval.h"
52 : #include "utils/lsyscache.h"
53 : #include "utils/rel.h"
54 : #include "utils/syscache.h"
55 : #include "utils/varlena.h"
56 :
57 :
58 : /*
59 : * Information used to validate the columns in the row filter expression. See
60 : * contain_invalid_rfcolumn_walker for details.
61 : */
62 : typedef struct rf_context
63 : {
64 : Bitmapset *bms_replident; /* bitset of replica identity columns */
65 : bool pubviaroot; /* true if we are validating the parent
66 : * relation's row filter */
67 : Oid relid; /* relid of the relation */
68 : Oid parentid; /* relid of the parent relation */
69 : } rf_context;
70 :
71 : static List *OpenTableList(List *tables);
72 : static void CloseTableList(List *rels);
73 : static void LockSchemaList(List *schemalist);
74 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
75 : AlterPublicationStmt *stmt);
76 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
77 : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
78 : AlterPublicationStmt *stmt);
79 : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
80 :
81 :
82 : static void
633 dean.a.rasheed 83 GIC 374 : parse_publication_options(ParseState *pstate,
633 dean.a.rasheed 84 ECB : List *options,
85 : bool *publish_given,
86 : PublicationActions *pubactions,
87 : bool *publish_via_partition_root_given,
88 : bool *publish_via_partition_root)
89 : {
90 : ListCell *lc;
91 :
2158 peter_e 92 GIC 374 : *publish_given = false;
1096 peter 93 CBC 374 : *publish_via_partition_root_given = false;
2271 peter_e 94 ECB :
95 : /* defaults */
1096 peter 96 GIC 374 : pubactions->pubinsert = true;
1096 peter 97 CBC 374 : pubactions->pubupdate = true;
98 374 : pubactions->pubdelete = true;
99 374 : pubactions->pubtruncate = true;
100 374 : *publish_via_partition_root = false;
2271 peter_e 101 ECB :
102 : /* Parse options */
2153 bruce 103 GIC 509 : foreach(lc, options)
2271 peter_e 104 ECB : {
2271 peter_e 105 GIC 144 : DefElem *defel = (DefElem *) lfirst(lc);
2271 peter_e 106 ECB :
2158 peter_e 107 GIC 144 : if (strcmp(defel->defname, "publish") == 0)
2271 peter_e 108 ECB : {
109 : char *publish;
110 : List *publish_list;
111 : ListCell *lc2;
112 :
2158 peter_e 113 GIC 60 : if (*publish_given)
633 dean.a.rasheed 114 LBC 0 : errorConflictingDefElem(defel, pstate);
2271 peter_e 115 EUB :
116 : /*
117 : * If publish option was given only the explicitly listed actions
118 : * should be published.
119 : */
1096 peter 120 GIC 60 : pubactions->pubinsert = false;
1096 peter 121 CBC 60 : pubactions->pubupdate = false;
122 60 : pubactions->pubdelete = false;
123 60 : pubactions->pubtruncate = false;
2271 peter_e 124 ECB :
2158 peter_e 125 GIC 60 : *publish_given = true;
2158 peter_e 126 CBC 60 : publish = defGetString(defel);
2271 peter_e 127 ECB :
2158 peter_e 128 GIC 60 : if (!SplitIdentifierString(publish, ',', &publish_list))
2271 peter_e 129 LBC 0 : ereport(ERROR,
2271 peter_e 130 EUB : (errcode(ERRCODE_SYNTAX_ERROR),
131 : errmsg("invalid list syntax in parameter \"%s\"",
132 : "publish")));
133 :
134 : /* Process the option list. */
186 drowley 135 GNC 130 : foreach(lc2, publish_list)
2158 peter_e 136 ECB : {
186 drowley 137 GNC 73 : char *publish_opt = (char *) lfirst(lc2);
2158 peter_e 138 ECB :
2158 peter_e 139 GIC 73 : if (strcmp(publish_opt, "insert") == 0)
1096 peter 140 CBC 53 : pubactions->pubinsert = true;
2158 peter_e 141 20 : else if (strcmp(publish_opt, "update") == 0)
1096 peter 142 9 : pubactions->pubupdate = true;
2158 peter_e 143 11 : else if (strcmp(publish_opt, "delete") == 0)
1096 peter 144 4 : pubactions->pubdelete = true;
1828 peter_e 145 7 : else if (strcmp(publish_opt, "truncate") == 0)
1096 peter 146 4 : pubactions->pubtruncate = true;
2158 peter_e 147 ECB : else
2158 peter_e 148 GIC 3 : ereport(ERROR,
2158 peter_e 149 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
150 : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
151 : "publish", publish_opt)));
152 : }
153 : }
1096 peter 154 GIC 84 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
1096 peter 155 ECB : {
1096 peter 156 GIC 81 : if (*publish_via_partition_root_given)
633 dean.a.rasheed 157 CBC 3 : errorConflictingDefElem(defel, pstate);
1096 peter 158 78 : *publish_via_partition_root_given = true;
159 78 : *publish_via_partition_root = defGetBoolean(defel);
1096 peter 160 ECB : }
161 : else
2158 peter_e 162 GIC 3 : ereport(ERROR,
2158 peter_e 163 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
164 : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
165 : }
2271 peter_e 166 GIC 365 : }
2271 peter_e 167 ECB :
168 : /*
169 : * Convert the PublicationObjSpecType list into schema oid list and
170 : * PublicationTable list.
171 : */
172 : static void
529 akapila 173 GIC 737 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
367 tomas.vondra 174 ECB : List **rels, List **schemas)
175 : {
176 : ListCell *cell;
177 : PublicationObjSpec *pubobj;
178 :
529 akapila 179 GIC 737 : if (!pubobjspec_list)
529 akapila 180 CBC 39 : return;
529 akapila 181 ECB :
529 akapila 182 GIC 1477 : foreach(cell, pubobjspec_list)
529 akapila 183 ECB : {
184 : Oid schemaid;
185 : List *search_path;
186 :
529 akapila 187 GIC 797 : pubobj = (PublicationObjSpec *) lfirst(cell);
529 akapila 188 ECB :
529 akapila 189 GIC 797 : switch (pubobj->pubobjtype)
529 akapila 190 ECB : {
529 akapila 191 GIC 619 : case PUBLICATIONOBJ_TABLE:
367 tomas.vondra 192 CBC 619 : *rels = lappend(*rels, pubobj->pubtable);
529 akapila 193 619 : break;
465 alvherre 194 166 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
529 akapila 195 166 : schemaid = get_namespace_oid(pubobj->name, false);
529 akapila 196 ECB :
197 : /* Filter out duplicates if user specifies "sch1, sch1" */
367 tomas.vondra 198 GIC 151 : *schemas = list_append_unique_oid(*schemas, schemaid);
529 akapila 199 CBC 151 : break;
465 alvherre 200 12 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
529 akapila 201 12 : search_path = fetch_search_path(false);
202 12 : if (search_path == NIL) /* nothing valid in search_path? */
203 3 : ereport(ERROR,
529 akapila 204 ECB : errcode(ERRCODE_UNDEFINED_SCHEMA),
205 : errmsg("no schema has been selected for CURRENT_SCHEMA"));
206 :
529 akapila 207 GIC 9 : schemaid = linitial_oid(search_path);
529 akapila 208 CBC 9 : list_free(search_path);
529 akapila 209 ECB :
210 : /* Filter out duplicates if user specifies "sch1, sch1" */
367 tomas.vondra 211 GIC 9 : *schemas = list_append_unique_oid(*schemas, schemaid);
529 akapila 212 CBC 9 : break;
529 akapila 213 LBC 0 : default:
529 akapila 214 EUB : /* shouldn't happen */
529 akapila 215 UIC 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
529 akapila 216 EUB : break;
217 : }
218 : }
219 : }
220 :
221 : /*
222 : * Returns true if any of the columns used in the row filter WHERE expression is
223 : * not part of REPLICA IDENTITY, false otherwise.
224 : */
225 : static bool
411 akapila 226 GIC 129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
411 akapila 227 ECB : {
411 akapila 228 GIC 129 : if (node == NULL)
411 akapila 229 LBC 0 : return false;
411 akapila 230 EUB :
411 akapila 231 GIC 129 : if (IsA(node, Var))
411 akapila 232 ECB : {
411 akapila 233 GIC 52 : Var *var = (Var *) node;
411 akapila 234 CBC 52 : AttrNumber attnum = var->varattno;
411 akapila 235 ECB :
236 : /*
237 : * If pubviaroot is true, we are validating the row filter of the
238 : * parent table, but the bitmap contains the replica identity
239 : * information of the child table. So, get the column number of the
240 : * child table as parent and child column order could be different.
241 : */
411 akapila 242 GIC 52 : if (context->pubviaroot)
411 akapila 243 ECB : {
411 akapila 244 GIC 8 : char *colname = get_attname(context->parentid, attnum, false);
411 akapila 245 ECB :
411 akapila 246 GIC 8 : attnum = get_attnum(context->relid, colname);
411 akapila 247 ECB : }
248 :
411 akapila 249 GIC 52 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
411 akapila 250 CBC 52 : context->bms_replident))
251 30 : return true;
411 akapila 252 ECB : }
253 :
411 akapila 254 GIC 99 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
411 akapila 255 ECB : (void *) context);
256 : }
257 :
258 : /*
259 : * Check if all columns referenced in the filter expression are part of the
260 : * REPLICA IDENTITY index or not.
261 : *
262 : * Returns true if any invalid column is found.
263 : */
264 : bool
379 tomas.vondra 265 GIC 321 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
332 tgl 266 ECB : bool pubviaroot)
267 : {
268 : HeapTuple rftuple;
411 akapila 269 GIC 321 : Oid relid = RelationGetRelid(relation);
411 akapila 270 CBC 321 : Oid publish_as_relid = RelationGetRelid(relation);
271 321 : bool result = false;
411 akapila 272 ECB : Datum rfdatum;
273 : bool rfisnull;
274 :
275 : /*
276 : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
277 : * allowed in the row filter and we can skip the validation.
278 : */
411 akapila 279 GIC 321 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
411 akapila 280 CBC 72 : return false;
411 akapila 281 ECB :
282 : /*
283 : * For a partition, if pubviaroot is true, find the topmost ancestor that
284 : * is published via this publication as we need to use its row filter
285 : * expression to filter the partition's changes.
286 : *
287 : * Note that even though the row filter used is for an ancestor, the
288 : * REPLICA IDENTITY used will be for the actual child table.
289 : */
411 akapila 290 GIC 249 : if (pubviaroot && relation->rd_rel->relispartition)
411 akapila 291 ECB : {
292 : publish_as_relid
389 tomas.vondra 293 GIC 53 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
411 akapila 294 ECB :
411 akapila 295 GIC 53 : if (!OidIsValid(publish_as_relid))
411 akapila 296 CBC 3 : publish_as_relid = relid;
411 akapila 297 ECB : }
298 :
411 akapila 299 GIC 249 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
411 akapila 300 ECB : ObjectIdGetDatum(publish_as_relid),
301 : ObjectIdGetDatum(pubid));
302 :
411 akapila 303 GIC 249 : if (!HeapTupleIsValid(rftuple))
411 akapila 304 CBC 28 : return false;
411 akapila 305 ECB :
411 akapila 306 GIC 221 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
411 akapila 307 ECB : Anum_pg_publication_rel_prqual,
308 : &rfisnull);
309 :
411 akapila 310 GIC 221 : if (!rfisnull)
411 akapila 311 ECB : {
411 akapila 312 GIC 51 : rf_context context = {0};
411 akapila 313 ECB : Node *rfnode;
411 akapila 314 GIC 51 : Bitmapset *bms = NULL;
411 akapila 315 ECB :
411 akapila 316 GIC 51 : context.pubviaroot = pubviaroot;
411 akapila 317 CBC 51 : context.parentid = publish_as_relid;
318 51 : context.relid = relid;
411 akapila 319 ECB :
320 : /* Remember columns that are part of the REPLICA IDENTITY */
411 akapila 321 GIC 51 : bms = RelationGetIndexAttrBitmap(relation,
411 akapila 322 ECB : INDEX_ATTR_BITMAP_IDENTITY_KEY);
323 :
411 akapila 324 GIC 51 : context.bms_replident = bms;
411 akapila 325 CBC 51 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
326 51 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
411 akapila 327 ECB : }
328 :
411 akapila 329 GIC 221 : ReleaseSysCache(rftuple);
411 akapila 330 ECB :
411 akapila 331 GIC 221 : return result;
411 akapila 332 ECB : }
333 :
334 : /*
335 : * Check if all columns referenced in the REPLICA IDENTITY are covered by
336 : * the column list.
337 : *
338 : * Returns true if any replica identity column is not covered by column list.
339 : */
340 : bool
379 tomas.vondra 341 GIC 321 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
332 tgl 342 ECB : bool pubviaroot)
343 : {
344 : HeapTuple tuple;
379 tomas.vondra 345 GIC 321 : Oid relid = RelationGetRelid(relation);
379 tomas.vondra 346 CBC 321 : Oid publish_as_relid = RelationGetRelid(relation);
347 321 : bool result = false;
379 tomas.vondra 348 ECB : Datum datum;
349 : bool isnull;
350 :
351 : /*
352 : * For a partition, if pubviaroot is true, find the topmost ancestor that
353 : * is published via this publication as we need to use its column list for
354 : * the changes.
355 : *
356 : * Note that even though the column list used is for an ancestor, the
357 : * REPLICA IDENTITY used will be for the actual child table.
358 : */
379 tomas.vondra 359 GIC 321 : if (pubviaroot && relation->rd_rel->relispartition)
379 tomas.vondra 360 ECB : {
379 tomas.vondra 361 GIC 64 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
379 tomas.vondra 362 ECB :
379 tomas.vondra 363 GIC 64 : if (!OidIsValid(publish_as_relid))
379 tomas.vondra 364 CBC 3 : publish_as_relid = relid;
379 tomas.vondra 365 ECB : }
366 :
379 tomas.vondra 367 GIC 321 : tuple = SearchSysCache2(PUBLICATIONRELMAP,
332 tgl 368 ECB : ObjectIdGetDatum(publish_as_relid),
369 : ObjectIdGetDatum(pubid));
370 :
379 tomas.vondra 371 GIC 321 : if (!HeapTupleIsValid(tuple))
379 tomas.vondra 372 CBC 32 : return false;
379 tomas.vondra 373 ECB :
379 tomas.vondra 374 GIC 289 : datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
332 tgl 375 ECB : Anum_pg_publication_rel_prattrs,
376 : &isnull);
377 :
379 tomas.vondra 378 GIC 289 : if (!isnull)
379 tomas.vondra 379 ECB : {
380 : int x;
381 : Bitmapset *idattrs;
379 tomas.vondra 382 GIC 115 : Bitmapset *columns = NULL;
379 tomas.vondra 383 ECB :
384 : /* With REPLICA IDENTITY FULL, no column list is allowed. */
379 tomas.vondra 385 GIC 115 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
379 tomas.vondra 386 CBC 18 : result = true;
379 tomas.vondra 387 ECB :
388 : /* Transform the column list datum to a bitmapset. */
379 tomas.vondra 389 GIC 115 : columns = pub_collist_to_bitmapset(NULL, datum, NULL);
379 tomas.vondra 390 ECB :
391 : /* Remember columns that are part of the REPLICA IDENTITY */
379 tomas.vondra 392 GIC 115 : idattrs = RelationGetIndexAttrBitmap(relation,
379 tomas.vondra 393 ECB : INDEX_ATTR_BITMAP_IDENTITY_KEY);
394 :
395 : /*
396 : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
397 : * offset (to handle system columns the usual way), while column list
398 : * does not use offset, so we can't do bms_is_subset(). Instead, we
399 : * have to loop over the idattrs and check all of them are in the
400 : * list.
401 : */
379 tomas.vondra 402 GIC 115 : x = -1;
379 tomas.vondra 403 CBC 175 : while ((x = bms_next_member(idattrs, x)) >= 0)
379 tomas.vondra 404 ECB : {
379 tomas.vondra 405 GIC 96 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
379 tomas.vondra 406 ECB :
407 : /*
408 : * If pubviaroot is true, we are validating the column list of the
409 : * parent table, but the bitmap contains the replica identity
410 : * information of the child table. The parent/child attnums may
411 : * not match, so translate them to the parent - get the attname
412 : * from the child, and look it up in the parent.
413 : */
379 tomas.vondra 414 GIC 96 : if (pubviaroot)
379 tomas.vondra 415 ECB : {
416 : /* attribute name in the child table */
332 tgl 417 GIC 40 : char *colname = get_attname(relid, attnum, false);
379 tomas.vondra 418 ECB :
419 : /*
420 : * Determine the attnum for the attribute name in parent (we
421 : * are using the column list defined on the parent).
422 : */
379 tomas.vondra 423 GIC 40 : attnum = get_attnum(publish_as_relid, colname);
379 tomas.vondra 424 ECB : }
425 :
426 : /* replica identity column, not covered by the column list */
379 tomas.vondra 427 GIC 96 : if (!bms_is_member(attnum, columns))
379 tomas.vondra 428 ECB : {
379 tomas.vondra 429 GIC 36 : result = true;
379 tomas.vondra 430 CBC 36 : break;
379 tomas.vondra 431 ECB : }
432 : }
433 :
379 tomas.vondra 434 GIC 115 : bms_free(idattrs);
379 tomas.vondra 435 CBC 115 : bms_free(columns);
379 tomas.vondra 436 ECB : }
437 :
379 tomas.vondra 438 GIC 289 : ReleaseSysCache(tuple);
379 tomas.vondra 439 ECB :
379 tomas.vondra 440 GIC 289 : return result;
379 tomas.vondra 441 ECB : }
442 :
443 : /* check_functions_in_node callback */
444 : static bool
411 akapila 445 GIC 198 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
411 akapila 446 ECB : {
411 akapila 447 GIC 198 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
411 akapila 448 ECB : func_id >= FirstNormalObjectId);
449 : }
450 :
451 : /*
452 : * The row filter walker checks if the row filter expression is a "simple
453 : * expression".
454 : *
455 : * It allows only simple or compound expressions such as:
456 : * - (Var Op Const)
457 : * - (Var Op Var)
458 : * - (Var Op Const) AND/OR (Var Op Const)
459 : * - etc
460 : * (where Var is a column of the table this filter belongs to)
461 : *
462 : * The simple expression has the following restrictions:
463 : * - User-defined operators are not allowed;
464 : * - User-defined functions are not allowed;
465 : * - User-defined types are not allowed;
466 : * - User-defined collations are not allowed;
467 : * - Non-immutable built-in functions are not allowed;
468 : * - System columns are not allowed.
469 : *
470 : * NOTES
471 : *
472 : * We don't allow user-defined functions/operators/types/collations because
473 : * (a) if a user drops a user-defined object used in a row filter expression or
474 : * if there is any other error while using it, the logical decoding
475 : * infrastructure won't be able to recover from such an error even if the
476 : * object is recreated again because a historic snapshot is used to evaluate
477 : * the row filter;
478 : * (b) a user-defined function can be used to access tables that could have
479 : * unpleasant results because a historic snapshot is used. That's why only
480 : * immutable built-in functions are allowed in row filter expressions.
481 : *
482 : * We don't allow system columns because currently, we don't have that
483 : * information in the tuple passed to downstream. Also, as we don't replicate
484 : * those to subscribers, there doesn't seem to be a need for a filter on those
485 : * columns.
486 : *
487 : * We can allow other node types after more analysis and testing.
488 : */
489 : static bool
411 akapila 490 GIC 664 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
411 akapila 491 ECB : {
411 akapila 492 GIC 664 : char *errdetail_msg = NULL;
411 akapila 493 ECB :
411 akapila 494 GIC 664 : if (node == NULL)
411 akapila 495 CBC 3 : return false;
411 akapila 496 ECB :
411 akapila 497 GIC 661 : switch (nodeTag(node))
411 akapila 498 ECB : {
411 akapila 499 GIC 178 : case T_Var:
411 akapila 500 ECB : /* System columns are not allowed. */
411 akapila 501 GIC 178 : if (((Var *) node)->varattno < InvalidAttrNumber)
411 akapila 502 CBC 3 : errdetail_msg = _("System columns are not allowed.");
503 178 : break;
504 176 : case T_OpExpr:
411 akapila 505 ECB : case T_DistinctExpr:
506 : case T_NullIfExpr:
507 : /* OK, except user-defined operators are not allowed. */
411 akapila 508 GIC 176 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
411 akapila 509 CBC 3 : errdetail_msg = _("User-defined operators are not allowed.");
510 176 : break;
511 3 : case T_ScalarArrayOpExpr:
411 akapila 512 ECB : /* OK, except user-defined operators are not allowed. */
411 akapila 513 GIC 3 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
411 akapila 514 LBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
411 akapila 515 EUB :
516 : /*
517 : * We don't need to check the hashfuncid and negfuncid of
518 : * ScalarArrayOpExpr as those functions are only built for a
519 : * subquery.
520 : */
411 akapila 521 GIC 3 : break;
411 akapila 522 CBC 3 : case T_RowCompareExpr:
411 akapila 523 ECB : {
524 : ListCell *opid;
525 :
526 : /* OK, except user-defined operators are not allowed. */
411 akapila 527 GIC 9 : foreach(opid, ((RowCompareExpr *) node)->opnos)
411 akapila 528 ECB : {
411 akapila 529 GIC 6 : if (lfirst_oid(opid) >= FirstNormalObjectId)
411 akapila 530 ECB : {
411 akapila 531 UIC 0 : errdetail_msg = _("User-defined operators are not allowed.");
411 akapila 532 UBC 0 : break;
411 akapila 533 EUB : }
534 : }
535 : }
411 akapila 536 GIC 3 : break;
411 akapila 537 CBC 298 : case T_Const:
411 akapila 538 ECB : case T_FuncExpr:
539 : case T_BoolExpr:
540 : case T_RelabelType:
541 : case T_CollateExpr:
542 : case T_CaseExpr:
543 : case T_CaseTestExpr:
544 : case T_ArrayExpr:
545 : case T_RowExpr:
546 : case T_CoalesceExpr:
547 : case T_MinMaxExpr:
548 : case T_XmlExpr:
549 : case T_NullTest:
550 : case T_BooleanTest:
551 : case T_List:
552 : /* OK, supported */
411 akapila 553 GIC 298 : break;
411 akapila 554 CBC 3 : default:
197 peter 555 3 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
411 akapila 556 3 : break;
411 akapila 557 ECB : }
558 :
559 : /*
560 : * For all the supported nodes, if we haven't already found a problem,
561 : * check the types, functions, and collations used in it. We check List
562 : * by walking through each element.
563 : */
193 alvherre 564 GIC 661 : if (!errdetail_msg && !IsA(node, List))
193 alvherre 565 ECB : {
193 alvherre 566 GIC 625 : if (exprType(node) >= FirstNormalObjectId)
193 alvherre 567 CBC 3 : errdetail_msg = _("User-defined types are not allowed.");
568 622 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
193 alvherre 569 ECB : (void *) pstate))
193 alvherre 570 GIC 6 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
193 alvherre 571 CBC 1232 : else if (exprCollation(node) >= FirstNormalObjectId ||
572 616 : exprInputCollation(node) >= FirstNormalObjectId)
573 3 : errdetail_msg = _("User-defined collations are not allowed.");
193 alvherre 574 ECB : }
575 :
576 : /*
577 : * If we found a problem in this node, throw error now. Otherwise keep
578 : * going.
579 : */
411 akapila 580 GIC 661 : if (errdetail_msg)
411 akapila 581 CBC 21 : ereport(ERROR,
411 akapila 582 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
583 : errmsg("invalid publication WHERE expression"),
584 : errdetail_internal("%s", errdetail_msg),
585 : parser_errposition(pstate, exprLocation(node))));
586 :
411 akapila 587 GIC 640 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
411 akapila 588 ECB : (void *) pstate);
589 : }
590 :
591 : /*
592 : * Check if the row filter expression is a "simple expression".
593 : *
594 : * See check_simple_rowfilter_expr_walker for details.
595 : */
596 : static bool
411 akapila 597 GIC 172 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
411 akapila 598 ECB : {
411 akapila 599 GIC 172 : return check_simple_rowfilter_expr_walker(node, pstate);
411 akapila 600 ECB : }
601 :
602 : /*
603 : * Transform the publication WHERE expression for all the relations in the list,
604 : * ensuring it is coerced to boolean and necessary collation information is
605 : * added if required, and add a new nsitem/RTE for the associated relation to
606 : * the ParseState's namespace list.
607 : *
608 : * Also check the publication row filter expression and throw an error if
609 : * anything not permitted or unexpected is encountered.
610 : */
611 : static void
411 akapila 612 GIC 511 : TransformPubWhereClauses(List *tables, const char *queryString,
411 akapila 613 ECB : bool pubviaroot)
614 : {
615 : ListCell *lc;
616 :
411 akapila 617 GIC 1022 : foreach(lc, tables)
411 akapila 618 ECB : {
619 : ParseNamespaceItem *nsitem;
411 akapila 620 GIC 541 : Node *whereclause = NULL;
411 akapila 621 ECB : ParseState *pstate;
411 akapila 622 GIC 541 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
411 akapila 623 ECB :
411 akapila 624 GIC 541 : if (pri->whereClause == NULL)
411 akapila 625 CBC 360 : continue;
411 akapila 626 ECB :
627 : /*
628 : * If the publication doesn't publish changes via the root partitioned
629 : * table, the partition's row filter will be used. So disallow using
630 : * WHERE clause on partitioned table in this case.
631 : */
411 akapila 632 GIC 181 : if (!pubviaroot &&
411 akapila 633 CBC 170 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
634 3 : ereport(ERROR,
411 akapila 635 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636 : errmsg("cannot use publication WHERE clause for relation \"%s\"",
637 : RelationGetRelationName(pri->relation)),
638 : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
639 : "publish_via_partition_root")));
640 :
641 : /*
642 : * A fresh pstate is required so that we only have "this" table in its
643 : * rangetable
644 : */
411 akapila 645 GIC 178 : pstate = make_parsestate(NULL);
411 akapila 646 CBC 178 : pstate->p_sourcetext = queryString;
647 178 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
411 akapila 648 ECB : AccessShareLock, NULL,
649 : false, false);
411 akapila 650 GIC 178 : addNSItemToQuery(pstate, nsitem, false, true, true);
411 akapila 651 ECB :
411 akapila 652 GIC 178 : whereclause = transformWhereClause(pstate,
411 akapila 653 CBC 178 : copyObject(pri->whereClause),
411 akapila 654 ECB : EXPR_KIND_WHERE,
655 : "PUBLICATION WHERE");
656 :
657 : /* Fix up collation information */
411 akapila 658 GIC 172 : assign_expr_collations(pstate, whereclause);
411 akapila 659 ECB :
660 : /*
661 : * We allow only simple expressions in row filters. See
662 : * check_simple_rowfilter_expr_walker.
663 : */
411 akapila 664 GIC 172 : check_simple_rowfilter_expr(whereclause, pstate);
411 akapila 665 ECB :
411 akapila 666 GIC 151 : free_parsestate(pstate);
411 akapila 667 ECB :
411 akapila 668 GIC 151 : pri->whereClause = whereclause;
411 akapila 669 ECB : }
411 akapila 670 GIC 481 : }
411 akapila 671 ECB :
672 :
673 : /*
674 : * Given a list of tables that are going to be added to a publication,
675 : * verify that they fulfill the necessary preconditions, namely: no tables
676 : * have a column list if any schema is published; and partitioned tables do
677 : * not have column lists if publish_via_partition_root is not set.
678 : *
679 : * 'publish_schema' indicates that the publication contains any TABLES IN
680 : * SCHEMA elements (newly added in this command, or preexisting).
681 : * 'pubviaroot' is the value of publish_via_partition_root.
682 : */
683 : static void
194 alvherre 684 GIC 481 : CheckPubRelationColumnList(char *pubname, List *tables,
198 akapila 685 ECB : bool publish_schema, bool pubviaroot)
686 : {
687 : ListCell *lc;
688 :
379 tomas.vondra 689 GIC 977 : foreach(lc, tables)
379 tomas.vondra 690 ECB : {
379 tomas.vondra 691 GIC 511 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
379 tomas.vondra 692 ECB :
379 tomas.vondra 693 GIC 511 : if (pri->columns == NIL)
379 tomas.vondra 694 CBC 348 : continue;
379 tomas.vondra 695 ECB :
696 : /*
697 : * Disallow specifying column list if any schema is in the
698 : * publication.
699 : *
700 : * XXX We could instead just forbid the case when the publication
701 : * tries to publish the table with a column list and a schema for that
702 : * table. However, if we do that then we need a restriction during
703 : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
704 : * seem to be a good idea.
705 : */
198 akapila 706 GIC 163 : if (publish_schema)
198 akapila 707 CBC 12 : ereport(ERROR,
198 akapila 708 ECB : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
709 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
710 : get_namespace_name(RelationGetNamespace(pri->relation)),
711 : RelationGetRelationName(pri->relation), pubname),
712 : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
713 :
714 : /*
715 : * If the publication doesn't publish changes via the root partitioned
716 : * table, the partition's column list will be used. So disallow using
717 : * a column list on the partitioned table in this case.
718 : */
379 tomas.vondra 719 GIC 151 : if (!pubviaroot &&
379 tomas.vondra 720 CBC 113 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
721 3 : ereport(ERROR,
379 tomas.vondra 722 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
723 : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
724 : get_namespace_name(RelationGetNamespace(pri->relation)),
725 : RelationGetRelationName(pri->relation), pubname),
726 : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
727 : "publish_via_partition_root")));
728 : }
379 tomas.vondra 729 GIC 466 : }
379 tomas.vondra 730 ECB :
731 : /*
732 : * Create new publication.
733 : */
734 : ObjectAddress
633 dean.a.rasheed 735 GIC 325 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
2271 peter_e 736 ECB : {
737 : Relation rel;
738 : ObjectAddress myself;
739 : Oid puboid;
740 : bool nulls[Natts_pg_publication];
741 : Datum values[Natts_pg_publication];
742 : HeapTuple tup;
743 : bool publish_given;
744 : PublicationActions pubactions;
745 : bool publish_via_partition_root_given;
746 : bool publish_via_partition_root;
747 : AclResult aclresult;
367 tomas.vondra 748 GIC 325 : List *relations = NIL;
367 tomas.vondra 749 CBC 325 : List *schemaidlist = NIL;
381 tomas.vondra 750 ECB :
751 : /* must have CREATE privilege on database */
147 peter 752 GNC 325 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
2271 peter_e 753 CBC 325 : if (aclresult != ACLCHECK_OK)
1954 754 3 : aclcheck_error(aclresult, OBJECT_DATABASE,
2271 755 3 : get_database_name(MyDatabaseId));
2271 peter_e 756 ECB :
757 : /* FOR ALL TABLES requires superuser */
367 tomas.vondra 758 GIC 322 : if (stmt->for_all_tables && !superuser())
2271 peter_e 759 LBC 0 : ereport(ERROR,
2271 peter_e 760 EUB : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
761 : errmsg("must be superuser to create FOR ALL TABLES publication")));
762 :
1539 andres 763 GIC 322 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2271 peter_e 764 ECB :
765 : /* Check if name is used */
1601 andres 766 GIC 322 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
1601 andres 767 ECB : CStringGetDatum(stmt->pubname));
2271 peter_e 768 GIC 322 : if (OidIsValid(puboid))
2271 peter_e 769 CBC 3 : ereport(ERROR,
2271 peter_e 770 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
771 : errmsg("publication \"%s\" already exists",
772 : stmt->pubname)));
773 :
774 : /* Form a tuple. */
2271 peter_e 775 GIC 319 : memset(values, 0, sizeof(values));
2271 peter_e 776 CBC 319 : memset(nulls, false, sizeof(nulls));
2271 peter_e 777 ECB :
2271 peter_e 778 GIC 319 : values[Anum_pg_publication_pubname - 1] =
2271 peter_e 779 CBC 319 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
780 319 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
2271 peter_e 781 ECB :
633 dean.a.rasheed 782 GIC 319 : parse_publication_options(pstate,
633 dean.a.rasheed 783 ECB : stmt->options,
784 : &publish_given, &pubactions,
785 : &publish_via_partition_root_given,
786 : &publish_via_partition_root);
787 :
1601 andres 788 GIC 310 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
1601 andres 789 ECB : Anum_pg_publication_oid);
1601 andres 790 GIC 310 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
2271 peter_e 791 CBC 310 : values[Anum_pg_publication_puballtables - 1] =
367 tomas.vondra 792 310 : BoolGetDatum(stmt->for_all_tables);
2271 peter_e 793 310 : values[Anum_pg_publication_pubinsert - 1] =
1096 peter 794 310 : BoolGetDatum(pubactions.pubinsert);
2271 peter_e 795 310 : values[Anum_pg_publication_pubupdate - 1] =
1096 peter 796 310 : BoolGetDatum(pubactions.pubupdate);
2271 peter_e 797 310 : values[Anum_pg_publication_pubdelete - 1] =
1096 peter 798 310 : BoolGetDatum(pubactions.pubdelete);
1828 peter_e 799 310 : values[Anum_pg_publication_pubtruncate - 1] =
1096 peter 800 310 : BoolGetDatum(pubactions.pubtruncate);
801 310 : values[Anum_pg_publication_pubviaroot - 1] =
802 310 : BoolGetDatum(publish_via_partition_root);
2271 peter_e 803 ECB :
2271 peter_e 804 GIC 310 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
2271 peter_e 805 ECB :
806 : /* Insert tuple into catalog. */
1601 andres 807 GIC 310 : CatalogTupleInsert(rel, tup);
2271 peter_e 808 CBC 310 : heap_freetuple(tup);
2271 peter_e 809 ECB :
2270 alvherre 810 GIC 310 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
2270 alvherre 811 ECB :
2271 peter_e 812 GIC 310 : ObjectAddressSet(myself, PublicationRelationId, puboid);
2271 peter_e 813 ECB :
814 : /* Make the changes visible. */
2271 peter_e 815 GIC 310 : CommandCounterIncrement();
2271 peter_e 816 ECB :
817 : /* Associate objects with the publication. */
367 tomas.vondra 818 GIC 310 : if (stmt->for_all_tables)
578 akapila 819 ECB : {
820 : /* Invalidate relcache so that publication info is rebuilt. */
578 akapila 821 GIC 24 : CacheInvalidateRelcacheAll();
578 akapila 822 ECB : }
823 : else
824 : {
367 tomas.vondra 825 GIC 286 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
367 tomas.vondra 826 ECB : &schemaidlist);
827 :
828 : /* FOR TABLES IN SCHEMA requires superuser */
235 tgl 829 GIC 277 : if (schemaidlist != NIL && !superuser())
529 akapila 830 CBC 3 : ereport(ERROR,
529 akapila 831 ECB : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
832 : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
833 :
235 tgl 834 GNC 274 : if (relations != NIL)
529 akapila 835 ECB : {
836 : List *rels;
837 :
367 tomas.vondra 838 GIC 181 : rels = OpenTableList(relations);
411 akapila 839 CBC 169 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
411 akapila 840 ECB : publish_via_partition_root);
841 :
194 alvherre 842 GIC 157 : CheckPubRelationColumnList(stmt->pubname, rels,
198 akapila 843 ECB : schemaidlist != NIL,
844 : publish_via_partition_root);
845 :
367 tomas.vondra 846 GIC 154 : PublicationAddTables(puboid, rels, true, NULL);
367 tomas.vondra 847 CBC 141 : CloseTableList(rels);
381 tomas.vondra 848 ECB : }
849 :
235 tgl 850 GNC 234 : if (schemaidlist != NIL)
381 tomas.vondra 851 ECB : {
852 : /*
853 : * Schema lock is held until the publication is created to prevent
854 : * concurrent schema deletion.
855 : */
367 tomas.vondra 856 GIC 67 : LockSchemaList(schemaidlist);
367 tomas.vondra 857 CBC 67 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
529 akapila 858 ECB : }
859 : }
860 :
1539 andres 861 GIC 255 : table_close(rel, RowExclusiveLock);
2271 peter_e 862 ECB :
2271 peter_e 863 GIC 255 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
2271 peter_e 864 ECB :
1366 tmunro 865 GIC 255 : if (wal_level != WAL_LEVEL_LOGICAL)
1366 tmunro 866 CBC 153 : ereport(WARNING,
1366 tmunro 867 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
868 : errmsg("wal_level is insufficient to publish logical changes"),
869 : errhint("Set wal_level to \"logical\" before creating subscriptions.")));
870 :
2271 peter_e 871 GIC 255 : return myself;
2271 peter_e 872 ECB : }
873 :
874 : /*
875 : * Change options of a publication.
876 : */
877 : static void
633 dean.a.rasheed 878 GIC 55 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
633 dean.a.rasheed 879 ECB : Relation rel, HeapTuple tup)
880 : {
881 : bool nulls[Natts_pg_publication];
882 : bool replaces[Natts_pg_publication];
883 : Datum values[Natts_pg_publication];
884 : bool publish_given;
885 : PublicationActions pubactions;
886 : bool publish_via_partition_root_given;
887 : bool publish_via_partition_root;
888 : ObjectAddress obj;
889 : Form_pg_publication pubform;
411 akapila 890 GIC 55 : List *root_relids = NIL;
411 akapila 891 ECB : ListCell *lc;
892 :
633 dean.a.rasheed 893 GIC 55 : parse_publication_options(pstate,
633 dean.a.rasheed 894 ECB : stmt->options,
895 : &publish_given, &pubactions,
896 : &publish_via_partition_root_given,
897 : &publish_via_partition_root);
898 :
411 akapila 899 GIC 55 : pubform = (Form_pg_publication) GETSTRUCT(tup);
411 akapila 900 ECB :
901 : /*
902 : * If the publication doesn't publish changes via the root partitioned
903 : * table, the partition's row filter and column list will be used. So
904 : * disallow using WHERE clause and column lists on partitioned table in
905 : * this case.
906 : */
411 akapila 907 GIC 55 : if (!pubform->puballtables && publish_via_partition_root_given &&
411 akapila 908 CBC 40 : !publish_via_partition_root)
411 akapila 909 ECB : {
910 : /*
911 : * Lock the publication so nobody else can do anything with it. This
912 : * prevents concurrent alter to add partitioned table(s) with WHERE
913 : * clause(s) and/or column lists which we don't allow when not
914 : * publishing via root.
915 : */
411 akapila 916 GIC 24 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
411 akapila 917 ECB : AccessShareLock);
918 :
411 akapila 919 GIC 24 : root_relids = GetPublicationRelations(pubform->oid,
411 akapila 920 ECB : PUBLICATION_PART_ROOT);
921 :
411 akapila 922 GIC 42 : foreach(lc, root_relids)
411 akapila 923 ECB : {
411 akapila 924 GIC 24 : Oid relid = lfirst_oid(lc);
361 alvherre 925 ECB : HeapTuple rftuple;
926 : char relkind;
927 : char *relname;
928 : bool has_rowfilter;
929 : bool has_collist;
930 :
931 : /*
932 : * Beware: we don't have lock on the relations, so cope silently
933 : * with the cache lookups returning NULL.
934 : */
935 :
411 akapila 936 GIC 24 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
411 akapila 937 ECB : ObjectIdGetDatum(relid),
938 : ObjectIdGetDatum(pubform->oid));
361 alvherre 939 GIC 24 : if (!HeapTupleIsValid(rftuple))
361 alvherre 940 LBC 0 : continue;
361 alvherre 941 GBC 24 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
361 alvherre 942 CBC 24 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
943 24 : if (!has_rowfilter && !has_collist)
411 akapila 944 ECB : {
361 alvherre 945 GIC 6 : ReleaseSysCache(rftuple);
361 alvherre 946 CBC 6 : continue;
361 alvherre 947 ECB : }
948 :
361 alvherre 949 GIC 18 : relkind = get_rel_relkind(relid);
361 alvherre 950 CBC 18 : if (relkind != RELKIND_PARTITIONED_TABLE)
361 alvherre 951 ECB : {
361 alvherre 952 GIC 12 : ReleaseSysCache(rftuple);
361 alvherre 953 CBC 12 : continue;
361 alvherre 954 ECB : }
361 alvherre 955 GIC 6 : relname = get_rel_name(relid);
361 alvherre 956 CBC 6 : if (relname == NULL) /* table concurrently dropped */
361 alvherre 957 ECB : {
411 akapila 958 UIC 0 : ReleaseSysCache(rftuple);
361 alvherre 959 UBC 0 : continue;
411 akapila 960 EUB : }
961 :
361 alvherre 962 GIC 6 : if (has_rowfilter)
361 alvherre 963 CBC 3 : ereport(ERROR,
361 alvherre 964 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
965 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
966 : "publish_via_partition_root",
967 : stmt->pubname),
968 : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
969 : relname, "publish_via_partition_root")));
361 alvherre 970 GIC 3 : Assert(has_collist);
361 alvherre 971 CBC 3 : ereport(ERROR,
361 alvherre 972 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
973 : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
974 : "publish_via_partition_root",
975 : stmt->pubname),
976 : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
977 : relname, "publish_via_partition_root")));
978 : }
979 : }
980 :
981 : /* Everything ok, form a new tuple. */
2271 peter_e 982 GIC 49 : memset(values, 0, sizeof(values));
2271 peter_e 983 CBC 49 : memset(nulls, false, sizeof(nulls));
984 49 : memset(replaces, false, sizeof(replaces));
2271 peter_e 985 ECB :
2158 peter_e 986 GIC 49 : if (publish_given)
2271 peter_e 987 ECB : {
1096 peter 988 GIC 14 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
2271 peter_e 989 CBC 14 : replaces[Anum_pg_publication_pubinsert - 1] = true;
2158 peter_e 990 ECB :
1096 peter 991 GIC 14 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
2271 peter_e 992 CBC 14 : replaces[Anum_pg_publication_pubupdate - 1] = true;
2158 peter_e 993 ECB :
1096 peter 994 GIC 14 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
2271 peter_e 995 CBC 14 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1828 peter_e 996 ECB :
1096 peter 997 GIC 14 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1828 peter_e 998 CBC 14 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
2271 peter_e 999 ECB : }
1000 :
1096 peter 1001 GIC 49 : if (publish_via_partition_root_given)
1096 peter 1002 ECB : {
1096 peter 1003 GIC 35 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1096 peter 1004 CBC 35 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1096 peter 1005 ECB : }
1006 :
2271 peter_e 1007 GIC 49 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
2271 peter_e 1008 ECB : replaces);
1009 :
1010 : /* Update the catalog. */
2259 alvherre 1011 GIC 49 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2271 peter_e 1012 ECB :
2271 peter_e 1013 GIC 49 : CommandCounterIncrement();
2271 peter_e 1014 ECB :
1601 andres 1015 GIC 49 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1601 andres 1016 ECB :
1017 : /* Invalidate the relcache. */
367 tomas.vondra 1018 GIC 49 : if (pubform->puballtables)
2271 peter_e 1019 ECB : {
2271 peter_e 1020 GIC 4 : CacheInvalidateRelcacheAll();
2271 peter_e 1021 ECB : }
1022 : else
1023 : {
529 akapila 1024 GIC 45 : List *relids = NIL;
529 akapila 1025 CBC 45 : List *schemarelids = NIL;
529 akapila 1026 ECB :
1027 : /*
1028 : * For any partitioned tables contained in the publication, we must
1029 : * invalidate all partitions contained in the respective partition
1030 : * trees, not just those explicitly mentioned in the publication.
1031 : */
411 akapila 1032 GIC 45 : if (root_relids == NIL)
411 akapila 1033 CBC 27 : relids = GetPublicationRelations(pubform->oid,
411 akapila 1034 ECB : PUBLICATION_PART_ALL);
1035 : else
1036 : {
1037 : /*
1038 : * We already got tables explicitly mentioned in the publication.
1039 : * Now get all partitions for the partitioned table in the list.
1040 : */
411 akapila 1041 GIC 36 : foreach(lc, root_relids)
411 akapila 1042 CBC 18 : relids = GetPubPartitionOptionRelations(relids,
411 akapila 1043 ECB : PUBLICATION_PART_ALL,
1044 : lfirst_oid(lc));
1045 : }
1046 :
529 akapila 1047 GIC 45 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
529 akapila 1048 ECB : PUBLICATION_PART_ALL);
529 akapila 1049 GIC 45 : relids = list_concat_unique_oid(relids, schemarelids);
2271 peter_e 1050 ECB :
564 akapila 1051 GIC 45 : InvalidatePublicationRels(relids);
2271 peter_e 1052 ECB : }
1053 :
1601 andres 1054 GIC 49 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
2271 peter_e 1055 CBC 49 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2271 peter_e 1056 ECB : (Node *) stmt);
1057 :
1601 andres 1058 GIC 49 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
2271 peter_e 1059 CBC 49 : }
2271 peter_e 1060 ECB :
1061 : /*
1062 : * Invalidate the relations.
1063 : */
1064 : void
564 akapila 1065 GIC 1071 : InvalidatePublicationRels(List *relids)
564 akapila 1066 ECB : {
1067 : /*
1068 : * We don't want to send too many individual messages, at some point it's
1069 : * cheaper to just reset whole relcache.
1070 : */
564 akapila 1071 GIC 1071 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
564 akapila 1072 ECB : {
1073 : ListCell *lc;
1074 :
564 akapila 1075 GIC 9421 : foreach(lc, relids)
564 akapila 1076 CBC 8350 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
564 akapila 1077 ECB : }
1078 : else
564 akapila 1079 UIC 0 : CacheInvalidateRelcacheAll();
564 akapila 1080 GBC 1071 : }
564 akapila 1081 ECB :
1082 : /*
1083 : * Add or remove table to/from publication.
1084 : */
1085 : static void
529 akapila 1086 GIC 421 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
198 akapila 1087 ECB : List *tables, const char *queryString,
1088 : bool publish_schema)
1089 : {
2271 peter_e 1090 GIC 421 : List *rels = NIL;
2271 peter_e 1091 CBC 421 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1601 andres 1092 421 : Oid pubid = pubform->oid;
2271 peter_e 1093 ECB :
1094 : /*
1095 : * Nothing to do if no objects, except in SET: for that it is quite
1096 : * possible that user has not specified any tables in which case we need
1097 : * to remove all the existing tables.
1098 : */
461 alvherre 1099 GIC 421 : if (!tables && stmt->action != AP_SetObjects)
529 akapila 1100 CBC 33 : return;
2271 peter_e 1101 ECB :
367 tomas.vondra 1102 GIC 388 : rels = OpenTableList(tables);
2271 peter_e 1103 ECB :
461 alvherre 1104 GIC 388 : if (stmt->action == AP_AddObjects)
529 akapila 1105 ECB : {
411 akapila 1106 GIC 131 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
411 akapila 1107 ECB :
198 akapila 1108 GIC 122 : publish_schema |= is_schema_publication(pubid);
198 akapila 1109 ECB :
194 alvherre 1110 GIC 122 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
198 akapila 1111 CBC 122 : pubform->pubviaroot);
379 tomas.vondra 1112 ECB :
367 tomas.vondra 1113 GIC 116 : PublicationAddTables(pubid, rels, false, stmt);
529 akapila 1114 ECB : }
461 alvherre 1115 GIC 257 : else if (stmt->action == AP_DropObjects)
367 tomas.vondra 1116 CBC 46 : PublicationDropTables(pubid, rels, false);
461 alvherre 1117 ECB : else /* AP_SetObjects */
1118 : {
1060 tgl 1119 GIC 211 : List *oldrelids = GetPublicationRelations(pubid,
1060 tgl 1120 ECB : PUBLICATION_PART_ROOT);
2271 peter_e 1121 GIC 211 : List *delrels = NIL;
2271 peter_e 1122 ECB : ListCell *oldlc;
1123 :
411 akapila 1124 GIC 211 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
411 akapila 1125 ECB :
194 alvherre 1126 GIC 202 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
198 akapila 1127 CBC 202 : pubform->pubviaroot);
379 tomas.vondra 1128 ECB :
1129 : /*
1130 : * To recreate the relation list for the publication, look for
1131 : * existing relations that do not need to be dropped.
1132 : */
2271 peter_e 1133 GIC 385 : foreach(oldlc, oldrelids)
2271 peter_e 1134 ECB : {
2271 peter_e 1135 GIC 189 : Oid oldrelid = lfirst_oid(oldlc);
2271 peter_e 1136 ECB : ListCell *newlc;
1137 : PublicationRelInfo *oldrel;
2271 peter_e 1138 GIC 189 : bool found = false;
411 akapila 1139 ECB : HeapTuple rftuple;
411 akapila 1140 GIC 189 : Node *oldrelwhereclause = NULL;
379 tomas.vondra 1141 CBC 189 : Bitmapset *oldcolumns = NULL;
411 akapila 1142 ECB :
1143 : /* look up the cache for the old relmap */
411 akapila 1144 GIC 189 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
411 akapila 1145 ECB : ObjectIdGetDatum(oldrelid),
1146 : ObjectIdGetDatum(pubid));
1147 :
1148 : /*
1149 : * See if the existing relation currently has a WHERE clause or a
1150 : * column list. We need to compare those too.
1151 : */
411 akapila 1152 GIC 189 : if (HeapTupleIsValid(rftuple))
411 akapila 1153 ECB : {
379 tomas.vondra 1154 GIC 189 : bool isnull = true;
411 akapila 1155 ECB : Datum whereClauseDatum;
1156 : Datum columnListDatum;
1157 :
1158 : /* Load the WHERE clause for this table. */
411 akapila 1159 GIC 189 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
411 akapila 1160 ECB : Anum_pg_publication_rel_prqual,
1161 : &isnull);
379 tomas.vondra 1162 GIC 189 : if (!isnull)
411 akapila 1163 CBC 105 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
411 akapila 1164 ECB :
1165 : /* Transform the int2vector column list to a bitmap. */
379 tomas.vondra 1166 GIC 189 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
332 tgl 1167 ECB : Anum_pg_publication_rel_prattrs,
1168 : &isnull);
1169 :
379 tomas.vondra 1170 GIC 189 : if (!isnull)
379 tomas.vondra 1171 CBC 62 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
379 tomas.vondra 1172 ECB :
411 akapila 1173 GIC 189 : ReleaseSysCache(rftuple);
411 akapila 1174 ECB : }
1175 :
2271 peter_e 1176 GIC 371 : foreach(newlc, rels)
2271 peter_e 1177 ECB : {
1178 : PublicationRelInfo *newpubrel;
1179 : Oid newrelid;
332 tgl 1180 GIC 190 : Bitmapset *newcolumns = NULL;
2271 peter_e 1181 ECB :
580 alvherre 1182 GIC 190 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
379 tomas.vondra 1183 CBC 190 : newrelid = RelationGetRelid(newpubrel->relation);
379 tomas.vondra 1184 ECB :
1185 : /*
1186 : * If the new publication has column list, transform it to a
1187 : * bitmap too.
1188 : */
379 tomas.vondra 1189 GIC 190 : if (newpubrel->columns)
379 tomas.vondra 1190 ECB : {
1191 : ListCell *lc;
1192 :
379 tomas.vondra 1193 GIC 177 : foreach(lc, newpubrel->columns)
379 tomas.vondra 1194 ECB : {
379 tomas.vondra 1195 GIC 109 : char *colname = strVal(lfirst(lc));
379 tomas.vondra 1196 CBC 109 : AttrNumber attnum = get_attnum(newrelid, colname);
379 tomas.vondra 1197 ECB :
379 tomas.vondra 1198 GIC 109 : newcolumns = bms_add_member(newcolumns, attnum);
379 tomas.vondra 1199 ECB : }
1200 : }
1201 :
1202 : /*
1203 : * Check if any of the new set of relations matches with the
1204 : * existing relations in the publication. Additionally, if the
1205 : * relation has an associated WHERE clause, check the WHERE
1206 : * expressions also match. Same for the column list. Drop the
1207 : * rest.
1208 : */
580 alvherre 1209 GIC 190 : if (RelationGetRelid(newpubrel->relation) == oldrelid)
2271 peter_e 1210 ECB : {
379 tomas.vondra 1211 GIC 130 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
379 tomas.vondra 1212 CBC 34 : bms_equal(oldcolumns, newcolumns))
411 akapila 1213 ECB : {
411 akapila 1214 GIC 8 : found = true;
411 akapila 1215 CBC 8 : break;
411 akapila 1216 ECB : }
1217 : }
1218 : }
1219 :
1220 : /*
1221 : * Add the non-matched relations to a list so that they can be
1222 : * dropped.
1223 : */
411 akapila 1224 GIC 189 : if (!found)
411 akapila 1225 ECB : {
411 akapila 1226 GIC 181 : oldrel = palloc(sizeof(PublicationRelInfo));
411 akapila 1227 CBC 181 : oldrel->whereClause = NULL;
379 tomas.vondra 1228 181 : oldrel->columns = NIL;
411 akapila 1229 181 : oldrel->relation = table_open(oldrelid,
411 akapila 1230 ECB : ShareUpdateExclusiveLock);
411 akapila 1231 GIC 181 : delrels = lappend(delrels, oldrel);
2271 peter_e 1232 ECB : }
1233 : }
1234 :
1235 : /* And drop them. */
367 tomas.vondra 1236 GIC 196 : PublicationDropTables(pubid, delrels, true);
2271 peter_e 1237 ECB :
1238 : /*
1239 : * Don't bother calculating the difference for adding, we'll catch and
1240 : * skip existing ones when doing catalog update.
1241 : */
367 tomas.vondra 1242 GIC 196 : PublicationAddTables(pubid, rels, true, stmt);
2271 peter_e 1243 ECB :
367 tomas.vondra 1244 GIC 196 : CloseTableList(delrels);
2271 peter_e 1245 ECB : }
1246 :
367 tomas.vondra 1247 GIC 331 : CloseTableList(rels);
2271 peter_e 1248 ECB : }
1249 :
1250 : /*
1251 : * Alter the publication schemas.
1252 : *
1253 : * Add or remove schemas to/from publication.
1254 : */
1255 : static void
529 akapila 1256 GIC 364 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
367 tomas.vondra 1257 ECB : HeapTuple tup, List *schemaidlist)
1258 : {
529 akapila 1259 GIC 364 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
529 akapila 1260 ECB :
1261 : /*
1262 : * Nothing to do if no objects, except in SET: for that it is quite
1263 : * possible that user has not specified any schemas in which case we need
1264 : * to remove all the existing schemas.
1265 : */
461 alvherre 1266 GIC 364 : if (!schemaidlist && stmt->action != AP_SetObjects)
529 akapila 1267 CBC 135 : return;
529 akapila 1268 ECB :
1269 : /*
1270 : * Schema lock is held until the publication is altered to prevent
1271 : * concurrent schema deletion.
1272 : */
529 akapila 1273 GIC 229 : LockSchemaList(schemaidlist);
461 alvherre 1274 CBC 229 : if (stmt->action == AP_AddObjects)
529 akapila 1275 ECB : {
1276 : ListCell *lc;
1277 : List *reloids;
1278 :
367 tomas.vondra 1279 GIC 14 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
529 akapila 1280 ECB :
198 akapila 1281 GIC 18 : foreach(lc, reloids)
198 akapila 1282 ECB : {
1283 : HeapTuple coltuple;
1284 :
198 akapila 1285 GIC 7 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
198 akapila 1286 ECB : ObjectIdGetDatum(lfirst_oid(lc)),
1287 : ObjectIdGetDatum(pubform->oid));
1288 :
198 akapila 1289 GIC 7 : if (!HeapTupleIsValid(coltuple))
198 akapila 1290 LBC 0 : continue;
198 akapila 1291 EUB :
1292 : /*
1293 : * Disallow adding schema if column list is already part of the
1294 : * publication. See CheckPubRelationColumnList.
1295 : */
198 akapila 1296 GIC 7 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
198 akapila 1297 CBC 3 : ereport(ERROR,
198 akapila 1298 ECB : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1299 : errmsg("cannot add schema to publication \"%s\"",
1300 : stmt->pubname),
1301 : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1302 :
198 akapila 1303 GIC 4 : ReleaseSysCache(coltuple);
198 akapila 1304 ECB : }
1305 :
367 tomas.vondra 1306 GIC 11 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
529 akapila 1307 ECB : }
461 alvherre 1308 GIC 215 : else if (stmt->action == AP_DropObjects)
367 tomas.vondra 1309 CBC 19 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
461 alvherre 1310 ECB : else /* AP_SetObjects */
1311 : {
367 tomas.vondra 1312 GIC 196 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
529 akapila 1313 CBC 196 : List *delschemas = NIL;
529 akapila 1314 ECB :
1315 : /* Identify which schemas should be dropped */
529 akapila 1316 GIC 196 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
529 akapila 1317 ECB :
1318 : /*
1319 : * Schema lock is held until the publication is altered to prevent
1320 : * concurrent schema deletion.
1321 : */
529 akapila 1322 GIC 196 : LockSchemaList(delschemas);
529 akapila 1323 ECB :
1324 : /* And drop them */
367 tomas.vondra 1325 GIC 196 : PublicationDropSchemas(pubform->oid, delschemas, true);
529 akapila 1326 ECB :
1327 : /*
1328 : * Don't bother calculating the difference for adding, we'll catch and
1329 : * skip existing ones when doing catalog update.
1330 : */
367 tomas.vondra 1331 GIC 196 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
529 akapila 1332 ECB : }
1333 : }
1334 :
1335 : /*
1336 : * Check if relations and schemas can be in a given publication and throw
1337 : * appropriate error if not.
1338 : */
1339 : static void
529 akapila 1340 GIC 442 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
367 tomas.vondra 1341 ECB : List *tables, List *schemaidlist)
1342 : {
529 akapila 1343 GIC 442 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
529 akapila 1344 ECB :
461 alvherre 1345 GIC 442 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
367 tomas.vondra 1346 CBC 47 : schemaidlist && !superuser())
529 akapila 1347 3 : ereport(ERROR,
529 akapila 1348 ECB : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1349 : errmsg("must be superuser to add or set schemas")));
1350 :
1351 : /*
1352 : * Check that user is allowed to manipulate the publication tables in
1353 : * schema
1354 : */
367 tomas.vondra 1355 GIC 439 : if (schemaidlist && pubform->puballtables)
529 akapila 1356 CBC 9 : ereport(ERROR,
529 akapila 1357 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1358 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1359 : NameStr(pubform->pubname)),
1360 : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1361 :
1362 : /* Check that user is allowed to manipulate the publication tables. */
529 akapila 1363 GIC 430 : if (tables && pubform->puballtables)
529 akapila 1364 CBC 9 : ereport(ERROR,
529 akapila 1365 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1366 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1367 : NameStr(pubform->pubname)),
1368 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
529 akapila 1369 GIC 421 : }
529 akapila 1370 ECB :
1371 : /*
1372 : * Alter the existing publication.
1373 : *
1374 : * This is dispatcher function for AlterPublicationOptions,
1375 : * AlterPublicationSchemas and AlterPublicationTables.
1376 : */
1377 : void
633 dean.a.rasheed 1378 GIC 506 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
2271 peter_e 1379 ECB : {
1380 : Relation rel;
1381 : HeapTuple tup;
1382 : Form_pg_publication pubform;
1383 :
1539 andres 1384 GIC 506 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2271 peter_e 1385 ECB :
2271 peter_e 1386 GIC 506 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
2271 peter_e 1387 ECB : CStringGetDatum(stmt->pubname));
1388 :
2271 peter_e 1389 GIC 506 : if (!HeapTupleIsValid(tup))
2271 peter_e 1390 LBC 0 : ereport(ERROR,
2271 peter_e 1391 EUB : (errcode(ERRCODE_UNDEFINED_OBJECT),
1392 : errmsg("publication \"%s\" does not exist",
1393 : stmt->pubname)));
1394 :
1601 andres 1395 GIC 506 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1601 andres 1396 ECB :
1397 : /* must be owner */
147 peter 1398 GNC 506 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1954 peter_e 1399 LBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2271 peter_e 1400 UBC 0 : stmt->pubname);
2271 peter_e 1401 EUB :
2271 peter_e 1402 GIC 506 : if (stmt->options)
633 dean.a.rasheed 1403 CBC 55 : AlterPublicationOptions(pstate, stmt, rel, tup);
2271 peter_e 1404 ECB : else
1405 : {
367 tomas.vondra 1406 GIC 451 : List *relations = NIL;
367 tomas.vondra 1407 CBC 451 : List *schemaidlist = NIL;
411 akapila 1408 451 : Oid pubid = pubform->oid;
529 akapila 1409 ECB :
367 tomas.vondra 1410 GIC 451 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
367 tomas.vondra 1411 ECB : &schemaidlist);
1412 :
367 tomas.vondra 1413 GIC 442 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
529 akapila 1414 ECB :
411 akapila 1415 GIC 421 : heap_freetuple(tup);
411 akapila 1416 ECB :
1417 : /* Lock the publication so nobody else can do anything with it. */
411 akapila 1418 GIC 421 : LockDatabaseObject(PublicationRelationId, pubid, 0,
529 akapila 1419 ECB : AccessExclusiveLock);
1420 :
1421 : /*
1422 : * It is possible that by the time we acquire the lock on publication,
1423 : * concurrent DDL has removed it. We can test this by checking the
1424 : * existence of publication. We get the tuple again to avoid the risk
1425 : * of any publication option getting changed.
1426 : */
411 akapila 1427 GIC 421 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
411 akapila 1428 CBC 421 : if (!HeapTupleIsValid(tup))
529 akapila 1429 LBC 0 : ereport(ERROR,
529 akapila 1430 EUB : errcode(ERRCODE_UNDEFINED_OBJECT),
1431 : errmsg("publication \"%s\" does not exist",
1432 : stmt->pubname));
1433 :
198 akapila 1434 GIC 421 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
198 akapila 1435 ECB : schemaidlist != NIL);
367 tomas.vondra 1436 GIC 364 : AlterPublicationSchemas(stmt, tup, schemaidlist);
529 akapila 1437 ECB : }
1438 :
1439 : /* Cleanup. */
2271 peter_e 1440 GIC 404 : heap_freetuple(tup);
1539 andres 1441 CBC 404 : table_close(rel, RowExclusiveLock);
2271 peter_e 1442 404 : }
2271 peter_e 1443 ECB :
1444 : /*
1445 : * Remove relation from publication by mapping OID.
1446 : */
1447 : void
2271 peter_e 1448 GIC 374 : RemovePublicationRelById(Oid proid)
2271 peter_e 1449 ECB : {
1450 : Relation rel;
1451 : HeapTuple tup;
1452 : Form_pg_publication_rel pubrel;
564 akapila 1453 GIC 374 : List *relids = NIL;
2271 peter_e 1454 ECB :
1539 andres 1455 GIC 374 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
2271 peter_e 1456 ECB :
2271 peter_e 1457 GIC 374 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
2271 peter_e 1458 ECB :
2271 peter_e 1459 GIC 374 : if (!HeapTupleIsValid(tup))
2271 peter_e 1460 LBC 0 : elog(ERROR, "cache lookup failed for publication table %u",
2271 peter_e 1461 EUB : proid);
1462 :
2271 peter_e 1463 GIC 374 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
2271 peter_e 1464 ECB :
1465 : /*
1466 : * Invalidate relcache so that publication info is rebuilt.
1467 : *
1468 : * For the partitioned tables, we must invalidate all partitions contained
1469 : * in the respective partition hierarchies, not just the one explicitly
1470 : * mentioned in the publication. This is required because we implicitly
1471 : * publish the child tables when the parent table is published.
1472 : */
564 akapila 1473 GIC 374 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
564 akapila 1474 ECB : pubrel->prrelid);
1475 :
564 akapila 1476 GIC 374 : InvalidatePublicationRels(relids);
2271 peter_e 1477 ECB :
2258 tgl 1478 GIC 374 : CatalogTupleDelete(rel, &tup->t_self);
2271 peter_e 1479 ECB :
2271 peter_e 1480 GIC 374 : ReleaseSysCache(tup);
2271 peter_e 1481 ECB :
1539 andres 1482 GIC 374 : table_close(rel, RowExclusiveLock);
2271 peter_e 1483 CBC 374 : }
2271 peter_e 1484 ECB :
1485 : /*
1486 : * Remove the publication by mapping OID.
1487 : */
1488 : void
578 akapila 1489 GIC 178 : RemovePublicationById(Oid pubid)
578 akapila 1490 ECB : {
1491 : Relation rel;
1492 : HeapTuple tup;
1493 : Form_pg_publication pubform;
1494 :
578 akapila 1495 GIC 178 : rel = table_open(PublicationRelationId, RowExclusiveLock);
578 akapila 1496 ECB :
578 akapila 1497 GIC 178 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
578 akapila 1498 CBC 178 : if (!HeapTupleIsValid(tup))
578 akapila 1499 LBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
578 akapila 1500 EUB :
465 alvherre 1501 GIC 178 : pubform = (Form_pg_publication) GETSTRUCT(tup);
578 akapila 1502 ECB :
1503 : /* Invalidate relcache so that publication info is rebuilt. */
367 tomas.vondra 1504 GIC 178 : if (pubform->puballtables)
578 akapila 1505 CBC 10 : CacheInvalidateRelcacheAll();
578 akapila 1506 ECB :
578 akapila 1507 GIC 178 : CatalogTupleDelete(rel, &tup->t_self);
578 akapila 1508 ECB :
578 akapila 1509 GIC 178 : ReleaseSysCache(tup);
578 akapila 1510 ECB :
578 akapila 1511 GIC 178 : table_close(rel, RowExclusiveLock);
578 akapila 1512 CBC 178 : }
578 akapila 1513 ECB :
1514 : /*
1515 : * Remove schema from publication by mapping OID.
1516 : */
1517 : void
529 akapila 1518 GIC 96 : RemovePublicationSchemaById(Oid psoid)
529 akapila 1519 ECB : {
1520 : Relation rel;
1521 : HeapTuple tup;
529 akapila 1522 GIC 96 : List *schemaRels = NIL;
529 akapila 1523 ECB : Form_pg_publication_namespace pubsch;
1524 :
529 akapila 1525 GIC 96 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
529 akapila 1526 ECB :
529 akapila 1527 GIC 96 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
529 akapila 1528 ECB :
529 akapila 1529 GIC 96 : if (!HeapTupleIsValid(tup))
529 akapila 1530 LBC 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
529 akapila 1531 EUB :
529 akapila 1532 GIC 96 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
529 akapila 1533 ECB :
1534 : /*
1535 : * Invalidate relcache so that publication info is rebuilt. See
1536 : * RemovePublicationRelById for why we need to consider all the
1537 : * partitions.
1538 : */
529 akapila 1539 GIC 96 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
529 akapila 1540 ECB : PUBLICATION_PART_ALL);
529 akapila 1541 GIC 96 : InvalidatePublicationRels(schemaRels);
529 akapila 1542 ECB :
529 akapila 1543 GIC 96 : CatalogTupleDelete(rel, &tup->t_self);
529 akapila 1544 ECB :
529 akapila 1545 GIC 96 : ReleaseSysCache(tup);
529 akapila 1546 ECB :
529 akapila 1547 GIC 96 : table_close(rel, RowExclusiveLock);
529 akapila 1548 CBC 96 : }
529 akapila 1549 ECB :
1550 : /*
1551 : * Open relations specified by a PublicationTable list.
1552 : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1553 : * add them to a publication.
1554 : */
1555 : static List *
367 tomas.vondra 1556 GIC 569 : OpenTableList(List *tables)
2271 peter_e 1557 ECB : {
2271 peter_e 1558 GIC 569 : List *relids = NIL;
367 tomas.vondra 1559 CBC 569 : List *rels = NIL;
2271 peter_e 1560 ECB : ListCell *lc;
411 akapila 1561 GIC 569 : List *relids_with_rf = NIL;
379 tomas.vondra 1562 CBC 569 : List *relids_with_collist = NIL;
2271 peter_e 1563 ECB :
1564 : /*
1565 : * Open, share-lock, and check all the explicitly-specified relations
1566 : */
367 tomas.vondra 1567 GIC 1167 : foreach(lc, tables)
2271 peter_e 1568 ECB : {
580 alvherre 1569 GIC 610 : PublicationTable *t = lfirst_node(PublicationTable, lc);
580 alvherre 1570 CBC 610 : bool recurse = t->relation->inh;
1584 tgl 1571 ECB : Relation rel;
1572 : Oid myrelid;
1573 : PublicationRelInfo *pub_rel;
1574 :
1575 : /* Allow query cancel in case this takes a long time */
2271 peter_e 1576 GIC 610 : CHECK_FOR_INTERRUPTS();
2271 peter_e 1577 ECB :
580 alvherre 1578 GIC 610 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
2271 peter_e 1579 CBC 610 : myrelid = RelationGetRelid(rel);
2154 tgl 1580 ECB :
1581 : /*
1582 : * Filter out duplicates if user specifies "foo, foo".
1583 : *
1584 : * Note that this algorithm is known to not be very efficient (O(N^2))
1585 : * but given that it only works on list of tables given to us by user
1586 : * it's deemed acceptable.
1587 : */
2271 peter_e 1588 GIC 610 : if (list_member_oid(relids, myrelid))
2271 peter_e 1589 ECB : {
1590 : /* Disallow duplicate tables if there are any with row filters. */
411 akapila 1591 GIC 12 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
411 akapila 1592 CBC 6 : ereport(ERROR,
411 akapila 1593 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
1594 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1595 : RelationGetRelationName(rel))));
1596 :
1597 : /* Disallow duplicate tables if there are any with column lists. */
379 tomas.vondra 1598 GIC 6 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
379 tomas.vondra 1599 CBC 6 : ereport(ERROR,
379 tomas.vondra 1600 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
1601 : errmsg("conflicting or redundant column lists for table \"%s\"",
1602 : RelationGetRelationName(rel))));
1603 :
1539 andres 1604 UIC 0 : table_close(rel, ShareUpdateExclusiveLock);
2271 peter_e 1605 UBC 0 : continue;
2271 peter_e 1606 EUB : }
1607 :
580 alvherre 1608 GIC 598 : pub_rel = palloc(sizeof(PublicationRelInfo));
580 alvherre 1609 CBC 598 : pub_rel->relation = rel;
411 akapila 1610 598 : pub_rel->whereClause = t->whereClause;
379 tomas.vondra 1611 598 : pub_rel->columns = t->columns;
367 1612 598 : rels = lappend(rels, pub_rel);
2271 peter_e 1613 598 : relids = lappend_oid(relids, myrelid);
2271 peter_e 1614 ECB :
411 akapila 1615 GIC 598 : if (t->whereClause)
411 akapila 1616 CBC 186 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
411 akapila 1617 ECB :
379 tomas.vondra 1618 GIC 598 : if (t->columns)
379 tomas.vondra 1619 CBC 166 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
379 tomas.vondra 1620 ECB :
1621 : /*
1622 : * Add children of this rel, if requested, so that they too are added
1623 : * to the publication. A partitioned table can't have any inheritance
1624 : * children other than its partitions, which need not be explicitly
1625 : * added to the publication.
1626 : */
1125 peter 1627 GIC 598 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
2271 peter_e 1628 ECB : {
1629 : List *children;
1630 : ListCell *child;
1631 :
2271 peter_e 1632 GIC 507 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
2271 peter_e 1633 ECB : NULL);
1634 :
2271 peter_e 1635 GIC 1018 : foreach(child, children)
2271 peter_e 1636 ECB : {
2271 peter_e 1637 GIC 511 : Oid childrelid = lfirst_oid(child);
2271 peter_e 1638 ECB :
1639 : /* Allow query cancel in case this takes a long time */
1584 tgl 1640 GIC 511 : CHECK_FOR_INTERRUPTS();
2271 peter_e 1641 ECB :
1642 : /*
1643 : * Skip duplicates if user specified both parent and child
1644 : * tables.
1645 : */
2271 peter_e 1646 GIC 511 : if (list_member_oid(relids, childrelid))
411 akapila 1647 ECB : {
1648 : /*
1649 : * We don't allow to specify row filter for both parent
1650 : * and child table at the same time as it is not very
1651 : * clear which one should be given preference.
1652 : */
411 akapila 1653 GIC 507 : if (childrelid != myrelid &&
411 akapila 1654 LBC 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
411 akapila 1655 UBC 0 : ereport(ERROR,
411 akapila 1656 EUB : (errcode(ERRCODE_DUPLICATE_OBJECT),
1657 : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1658 : RelationGetRelationName(rel))));
1659 :
1660 : /*
1661 : * We don't allow to specify column list for both parent
1662 : * and child table at the same time as it is not very
1663 : * clear which one should be given preference.
1664 : */
379 tomas.vondra 1665 GIC 507 : if (childrelid != myrelid &&
379 tomas.vondra 1666 LBC 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
379 tomas.vondra 1667 UBC 0 : ereport(ERROR,
379 tomas.vondra 1668 EUB : (errcode(ERRCODE_DUPLICATE_OBJECT),
1669 : errmsg("conflicting or redundant column lists for table \"%s\"",
1670 : RelationGetRelationName(rel))));
1671 :
2271 peter_e 1672 GIC 507 : continue;
411 akapila 1673 ECB : }
1674 :
1675 : /* find_all_inheritors already got lock */
1539 andres 1676 GIC 4 : rel = table_open(childrelid, NoLock);
580 alvherre 1677 CBC 4 : pub_rel = palloc(sizeof(PublicationRelInfo));
1678 4 : pub_rel->relation = rel;
411 akapila 1679 ECB : /* child inherits WHERE clause from parent */
411 akapila 1680 GIC 4 : pub_rel->whereClause = t->whereClause;
367 tomas.vondra 1681 ECB :
1682 : /* child inherits column list from parent */
379 tomas.vondra 1683 GIC 4 : pub_rel->columns = t->columns;
367 tomas.vondra 1684 CBC 4 : rels = lappend(rels, pub_rel);
2271 peter_e 1685 4 : relids = lappend_oid(relids, childrelid);
411 akapila 1686 ECB :
411 akapila 1687 GIC 4 : if (t->whereClause)
411 akapila 1688 CBC 1 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
379 tomas.vondra 1689 ECB :
379 tomas.vondra 1690 GIC 4 : if (t->columns)
379 tomas.vondra 1691 LBC 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
2271 peter_e 1692 EUB : }
1693 : }
1694 : }
1695 :
2271 peter_e 1696 GIC 557 : list_free(relids);
411 akapila 1697 CBC 557 : list_free(relids_with_rf);
2271 peter_e 1698 ECB :
367 tomas.vondra 1699 GIC 557 : return rels;
2271 peter_e 1700 ECB : }
1701 :
1702 : /*
1703 : * Close all relations in the list.
1704 : */
1705 : static void
367 tomas.vondra 1706 GIC 668 : CloseTableList(List *rels)
2271 peter_e 1707 ECB : {
1708 : ListCell *lc;
1709 :
2271 peter_e 1710 GIC 1354 : foreach(lc, rels)
2271 peter_e 1711 ECB : {
1712 : PublicationRelInfo *pub_rel;
1713 :
580 alvherre 1714 GIC 686 : pub_rel = (PublicationRelInfo *) lfirst(lc);
580 alvherre 1715 CBC 686 : table_close(pub_rel->relation, NoLock);
2271 peter_e 1716 ECB : }
1717 :
411 akapila 1718 GIC 668 : list_free_deep(rels);
2271 peter_e 1719 CBC 668 : }
2271 peter_e 1720 ECB :
1721 : /*
1722 : * Lock the schemas specified in the schema list in AccessShareLock mode in
1723 : * order to prevent concurrent schema deletion.
1724 : */
1725 : static void
529 akapila 1726 GIC 492 : LockSchemaList(List *schemalist)
529 akapila 1727 ECB : {
1728 : ListCell *lc;
1729 :
529 akapila 1730 GIC 634 : foreach(lc, schemalist)
529 akapila 1731 ECB : {
529 akapila 1732 GIC 142 : Oid schemaid = lfirst_oid(lc);
529 akapila 1733 ECB :
1734 : /* Allow query cancel in case this takes a long time */
529 akapila 1735 GIC 142 : CHECK_FOR_INTERRUPTS();
529 akapila 1736 CBC 142 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
529 akapila 1737 ECB :
1738 : /*
1739 : * It is possible that by the time we acquire the lock on schema,
1740 : * concurrent DDL has removed it. We can test this by checking the
1741 : * existence of schema.
1742 : */
529 akapila 1743 GIC 142 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
529 akapila 1744 LBC 0 : ereport(ERROR,
529 akapila 1745 EUB : errcode(ERRCODE_UNDEFINED_SCHEMA),
1746 : errmsg("schema with OID %u does not exist", schemaid));
1747 : }
529 akapila 1748 GIC 492 : }
529 akapila 1749 ECB :
1750 : /*
1751 : * Add listed tables to the publication.
1752 : */
1753 : static void
367 tomas.vondra 1754 GIC 466 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
2271 peter_e 1755 ECB : AlterPublicationStmt *stmt)
1756 : {
1757 : ListCell *lc;
1758 :
367 tomas.vondra 1759 GIC 466 : Assert(!stmt || !stmt->for_all_tables);
2271 peter_e 1760 ECB :
2271 peter_e 1761 GIC 931 : foreach(lc, rels)
2271 peter_e 1762 ECB : {
580 alvherre 1763 GIC 496 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
580 alvherre 1764 CBC 496 : Relation rel = pub_rel->relation;
2153 bruce 1765 ECB : ObjectAddress obj;
1766 :
1767 : /* Must be owner of the table or superuser. */
147 peter 1768 GNC 496 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1954 peter_e 1769 CBC 3 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
2271 1770 3 : RelationGetRelationName(rel));
2271 peter_e 1771 ECB :
580 alvherre 1772 GIC 493 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
2271 peter_e 1773 CBC 465 : if (stmt)
2271 peter_e 1774 ECB : {
2271 peter_e 1775 GIC 293 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2271 peter_e 1776 ECB : (Node *) stmt);
1777 :
2271 peter_e 1778 GIC 293 : InvokeObjectPostCreateHook(PublicationRelRelationId,
2271 peter_e 1779 ECB : obj.objectId, 0);
1780 : }
1781 : }
2271 peter_e 1782 GIC 435 : }
2271 peter_e 1783 ECB :
1784 : /*
1785 : * Remove listed tables from the publication.
1786 : */
1787 : static void
367 tomas.vondra 1788 GIC 242 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
2271 peter_e 1789 ECB : {
1790 : ObjectAddress obj;
1791 : ListCell *lc;
1792 : Oid prid;
1793 :
2271 peter_e 1794 GIC 463 : foreach(lc, rels)
2271 peter_e 1795 ECB : {
580 alvherre 1796 GIC 230 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
580 alvherre 1797 CBC 230 : Relation rel = pubrel->relation;
2271 peter_e 1798 230 : Oid relid = RelationGetRelid(rel);
2271 peter_e 1799 ECB :
379 tomas.vondra 1800 GIC 230 : if (pubrel->columns)
379 tomas.vondra 1801 LBC 0 : ereport(ERROR,
379 tomas.vondra 1802 EUB : errcode(ERRCODE_SYNTAX_ERROR),
1803 : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1804 :
1601 andres 1805 GIC 230 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1601 andres 1806 ECB : ObjectIdGetDatum(relid),
1807 : ObjectIdGetDatum(pubid));
2271 peter_e 1808 GIC 230 : if (!OidIsValid(prid))
2271 peter_e 1809 ECB : {
2271 peter_e 1810 GIC 6 : if (missing_ok)
2271 peter_e 1811 LBC 0 : continue;
2271 peter_e 1812 EUB :
2271 peter_e 1813 GIC 6 : ereport(ERROR,
2271 peter_e 1814 ECB : (errcode(ERRCODE_UNDEFINED_OBJECT),
1815 : errmsg("relation \"%s\" is not part of the publication",
1816 : RelationGetRelationName(rel))));
1817 : }
1818 :
411 akapila 1819 GIC 224 : if (pubrel->whereClause)
411 akapila 1820 CBC 3 : ereport(ERROR,
411 akapila 1821 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
1822 : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1823 :
2271 peter_e 1824 GIC 221 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
2271 peter_e 1825 CBC 221 : performDeletion(&obj, DROP_CASCADE, 0);
2271 peter_e 1826 ECB : }
2271 peter_e 1827 GIC 233 : }
2271 peter_e 1828 ECB :
1829 : /*
1830 : * Add listed schemas to the publication.
1831 : */
1832 : static void
367 tomas.vondra 1833 GIC 274 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
367 tomas.vondra 1834 ECB : AlterPublicationStmt *stmt)
1835 : {
1836 : ListCell *lc;
1837 :
367 tomas.vondra 1838 GIC 274 : Assert(!stmt || !stmt->for_all_tables);
529 akapila 1839 ECB :
529 akapila 1840 GIC 379 : foreach(lc, schemas)
529 akapila 1841 ECB : {
529 akapila 1842 GIC 111 : Oid schemaid = lfirst_oid(lc);
529 akapila 1843 ECB : ObjectAddress obj;
1844 :
367 tomas.vondra 1845 GIC 111 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
529 akapila 1846 CBC 105 : if (stmt)
529 akapila 1847 ECB : {
529 akapila 1848 GIC 29 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
529 akapila 1849 ECB : (Node *) stmt);
1850 :
529 akapila 1851 GIC 29 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
529 akapila 1852 ECB : obj.objectId, 0);
1853 : }
1854 : }
529 akapila 1855 GIC 268 : }
529 akapila 1856 ECB :
1857 : /*
1858 : * Remove listed schemas from the publication.
1859 : */
1860 : static void
367 tomas.vondra 1861 GIC 215 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
529 akapila 1862 ECB : {
1863 : ObjectAddress obj;
1864 : ListCell *lc;
1865 : Oid psid;
1866 :
529 akapila 1867 GIC 240 : foreach(lc, schemas)
529 akapila 1868 ECB : {
529 akapila 1869 GIC 28 : Oid schemaid = lfirst_oid(lc);
529 akapila 1870 ECB :
367 tomas.vondra 1871 GIC 28 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
529 akapila 1872 ECB : Anum_pg_publication_namespace_oid,
1873 : ObjectIdGetDatum(schemaid),
1874 : ObjectIdGetDatum(pubid));
529 akapila 1875 GIC 28 : if (!OidIsValid(psid))
529 akapila 1876 ECB : {
529 akapila 1877 GIC 3 : if (missing_ok)
529 akapila 1878 LBC 0 : continue;
529 akapila 1879 EUB :
529 akapila 1880 GIC 3 : ereport(ERROR,
529 akapila 1881 ECB : (errcode(ERRCODE_UNDEFINED_OBJECT),
1882 : errmsg("tables from schema \"%s\" are not part of the publication",
1883 : get_namespace_name(schemaid))));
1884 : }
1885 :
529 akapila 1886 GIC 25 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
529 akapila 1887 CBC 25 : performDeletion(&obj, DROP_CASCADE, 0);
529 akapila 1888 ECB : }
529 akapila 1889 GIC 212 : }
529 akapila 1890 ECB :
1891 : /*
1892 : * Internal workhorse for changing a publication owner
1893 : */
1894 : static void
2271 peter_e 1895 GIC 12 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2271 peter_e 1896 ECB : {
1897 : Form_pg_publication form;
1898 :
2271 peter_e 1899 GIC 12 : form = (Form_pg_publication) GETSTRUCT(tup);
2271 peter_e 1900 ECB :
2271 peter_e 1901 GIC 12 : if (form->pubowner == newOwnerId)
2271 peter_e 1902 LBC 0 : return;
2271 peter_e 1903 EUB :
2246 peter_e 1904 GIC 12 : if (!superuser())
2246 peter_e 1905 ECB : {
1906 : AclResult aclresult;
1907 :
1908 : /* Must be owner */
147 peter 1909 GNC 6 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1954 peter_e 1910 LBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2246 peter_e 1911 UBC 0 : NameStr(form->pubname));
2246 peter_e 1912 EUB :
1913 : /* Must be able to become new owner */
142 rhaas 1914 GNC 6 : check_can_set_role(GetUserId(), newOwnerId);
2246 peter_e 1915 ECB :
1916 : /* New owner must have CREATE privilege on database */
147 peter 1917 GNC 6 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2246 peter_e 1918 CBC 6 : if (aclresult != ACLCHECK_OK)
1954 peter_e 1919 LBC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2246 peter_e 1920 UBC 0 : get_database_name(MyDatabaseId));
2246 peter_e 1921 EUB :
2246 peter_e 1922 GIC 6 : if (form->puballtables && !superuser_arg(newOwnerId))
2246 peter_e 1923 LBC 0 : ereport(ERROR,
2246 peter_e 1924 EUB : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925 : errmsg("permission denied to change owner of publication \"%s\"",
1926 : NameStr(form->pubname)),
1927 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1928 :
487 akapila 1929 GIC 6 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
487 akapila 1930 CBC 3 : ereport(ERROR,
487 akapila 1931 ECB : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1932 : errmsg("permission denied to change owner of publication \"%s\"",
1933 : NameStr(form->pubname)),
1934 : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1935 : }
1936 :
2271 peter_e 1937 GIC 9 : form->pubowner = newOwnerId;
2259 alvherre 1938 CBC 9 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2271 peter_e 1939 ECB :
1940 : /* Update owner dependency reference */
2271 peter_e 1941 GIC 9 : changeDependencyOnOwner(PublicationRelationId,
1601 andres 1942 ECB : form->oid,
1943 : newOwnerId);
1944 :
2271 peter_e 1945 GIC 9 : InvokeObjectPostAlterHook(PublicationRelationId,
1601 andres 1946 ECB : form->oid, 0);
1947 : }
1948 :
1949 : /*
1950 : * Change publication owner -- by name
1951 : */
1952 : ObjectAddress
2271 peter_e 1953 GIC 12 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2271 peter_e 1954 ECB : {
1955 : Oid subid;
1956 : HeapTuple tup;
1957 : Relation rel;
1958 : ObjectAddress address;
1959 : Form_pg_publication pubform;
1960 :
1539 andres 1961 GIC 12 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2271 peter_e 1962 ECB :
2271 peter_e 1963 GIC 12 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2271 peter_e 1964 ECB :
2271 peter_e 1965 GIC 12 : if (!HeapTupleIsValid(tup))
2271 peter_e 1966 LBC 0 : ereport(ERROR,
2271 peter_e 1967 EUB : (errcode(ERRCODE_UNDEFINED_OBJECT),
1968 : errmsg("publication \"%s\" does not exist", name)));
1969 :
1601 andres 1970 GIC 12 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1601 andres 1971 CBC 12 : subid = pubform->oid;
2271 peter_e 1972 ECB :
2271 peter_e 1973 GIC 12 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2271 peter_e 1974 ECB :
2271 peter_e 1975 GIC 9 : ObjectAddressSet(address, PublicationRelationId, subid);
2271 peter_e 1976 ECB :
2271 peter_e 1977 GIC 9 : heap_freetuple(tup);
2271 peter_e 1978 ECB :
1539 andres 1979 GIC 9 : table_close(rel, RowExclusiveLock);
2271 peter_e 1980 ECB :
2271 peter_e 1981 GIC 9 : return address;
2271 peter_e 1982 ECB : }
1983 :
1984 : /*
1985 : * Change publication owner -- by OID
1986 : */
1987 : void
2271 peter_e 1988 UIC 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
2271 peter_e 1989 EUB : {
1990 : HeapTuple tup;
1991 : Relation rel;
1992 :
1539 andres 1993 UIC 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2271 peter_e 1994 EUB :
2271 peter_e 1995 UIC 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2271 peter_e 1996 EUB :
2271 peter_e 1997 UIC 0 : if (!HeapTupleIsValid(tup))
2271 peter_e 1998 UBC 0 : ereport(ERROR,
2271 peter_e 1999 EUB : (errcode(ERRCODE_UNDEFINED_OBJECT),
2000 : errmsg("publication with OID %u does not exist", subid)));
2001 :
2271 peter_e 2002 UIC 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2271 peter_e 2003 EUB :
2271 peter_e 2004 UIC 0 : heap_freetuple(tup);
2271 peter_e 2005 EUB :
1539 andres 2006 UIC 0 : table_close(rel, RowExclusiveLock);
2271 peter_e 2007 UBC 0 : }
|