Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * publicationcmds.c
4 : : * publication manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/publicationcmds.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/htup_details.h"
18 : : #include "access/table.h"
19 : : #include "access/xact.h"
20 : : #include "catalog/catalog.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/namespace.h"
23 : : #include "catalog/objectaccess.h"
24 : : #include "catalog/objectaddress.h"
25 : : #include "catalog/pg_database.h"
26 : : #include "catalog/pg_inherits.h"
27 : : #include "catalog/pg_namespace.h"
28 : : #include "catalog/pg_proc.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "commands/dbcommands.h"
33 : : #include "commands/defrem.h"
34 : : #include "commands/event_trigger.h"
35 : : #include "commands/publicationcmds.h"
36 : : #include "miscadmin.h"
37 : : #include "nodes/nodeFuncs.h"
38 : : #include "parser/parse_clause.h"
39 : : #include "parser/parse_collate.h"
40 : : #include "parser/parse_relation.h"
41 : : #include "storage/lmgr.h"
42 : : #include "utils/acl.h"
43 : : #include "utils/builtins.h"
44 : : #include "utils/inval.h"
45 : : #include "utils/lsyscache.h"
46 : : #include "utils/rel.h"
47 : : #include "utils/syscache.h"
48 : : #include "utils/varlena.h"
49 : :
50 : :
51 : : /*
52 : : * Information used to validate the columns in the row filter expression. See
53 : : * contain_invalid_rfcolumn_walker for details.
54 : : */
55 : : typedef struct rf_context
56 : : {
57 : : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : : bool pubviaroot; /* true if we are validating the parent
59 : : * relation's row filter */
60 : : Oid relid; /* relid of the relation */
61 : : Oid parentid; /* relid of the parent relation */
62 : : } rf_context;
63 : :
64 : : static List *OpenTableList(List *tables);
65 : : static void CloseTableList(List *rels);
66 : : static void LockSchemaList(List *schemalist);
67 : : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : : AlterPublicationStmt *stmt);
69 : : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : : AlterPublicationStmt *stmt);
72 : : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : :
74 : :
75 : : static void
1004 dean.a.rasheed@gmail 76 :CBC 391 : parse_publication_options(ParseState *pstate,
77 : : List *options,
78 : : bool *publish_given,
79 : : PublicationActions *pubactions,
80 : : bool *publish_via_partition_root_given,
81 : : bool *publish_via_partition_root)
82 : : {
83 : : ListCell *lc;
84 : :
2529 peter_e@gmx.net 85 : 391 : *publish_given = false;
1467 peter@eisentraut.org 86 : 391 : *publish_via_partition_root_given = false;
87 : :
88 : : /* defaults */
89 : 391 : pubactions->pubinsert = true;
90 : 391 : pubactions->pubupdate = true;
91 : 391 : pubactions->pubdelete = true;
92 : 391 : pubactions->pubtruncate = true;
93 : 391 : *publish_via_partition_root = false;
94 : :
95 : : /* Parse options */
2524 bruce@momjian.us 96 [ + + + + : 527 : foreach(lc, options)
+ + ]
97 : : {
2642 peter_e@gmx.net 98 : 145 : DefElem *defel = (DefElem *) lfirst(lc);
99 : :
2529 100 [ + + ]: 145 : if (strcmp(defel->defname, "publish") == 0)
101 : : {
102 : : char *publish;
103 : : List *publish_list;
104 : : ListCell *lc2;
105 : :
106 [ - + ]: 61 : if (*publish_given)
1004 dean.a.rasheed@gmail 107 :UBC 0 : errorConflictingDefElem(defel, pstate);
108 : :
109 : : /*
110 : : * If publish option was given only the explicitly listed actions
111 : : * should be published.
112 : : */
1467 peter@eisentraut.org 113 :CBC 61 : pubactions->pubinsert = false;
114 : 61 : pubactions->pubupdate = false;
115 : 61 : pubactions->pubdelete = false;
116 : 61 : pubactions->pubtruncate = false;
117 : :
2529 peter_e@gmx.net 118 : 61 : *publish_given = true;
119 : 61 : publish = defGetString(defel);
120 : :
121 [ - + ]: 61 : if (!SplitIdentifierString(publish, ',', &publish_list))
2642 peter_e@gmx.net 122 [ # # ]:UBC 0 : ereport(ERROR,
123 : : (errcode(ERRCODE_SYNTAX_ERROR),
124 : : errmsg("invalid list syntax in parameter \"%s\"",
125 : : "publish")));
126 : :
127 : : /* Process the option list. */
557 drowley@postgresql.o 128 [ + + + + :CBC 135 : foreach(lc2, publish_list)
+ + ]
129 : : {
130 : 77 : char *publish_opt = (char *) lfirst(lc2);
131 : :
2529 peter_e@gmx.net 132 [ + + ]: 77 : if (strcmp(publish_opt, "insert") == 0)
1467 peter@eisentraut.org 133 : 54 : pubactions->pubinsert = true;
2529 peter_e@gmx.net 134 [ + + ]: 23 : else if (strcmp(publish_opt, "update") == 0)
1467 peter@eisentraut.org 135 : 10 : pubactions->pubupdate = true;
2529 peter_e@gmx.net 136 [ + + ]: 13 : else if (strcmp(publish_opt, "delete") == 0)
1467 peter@eisentraut.org 137 : 5 : pubactions->pubdelete = true;
2199 peter_e@gmx.net 138 [ + + ]: 8 : else if (strcmp(publish_opt, "truncate") == 0)
1467 peter@eisentraut.org 139 : 5 : pubactions->pubtruncate = true;
140 : : else
2529 peter_e@gmx.net 141 [ + - ]: 3 : ereport(ERROR,
142 : : (errcode(ERRCODE_SYNTAX_ERROR),
143 : : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
144 : : "publish", publish_opt)));
145 : : }
146 : : }
1467 peter@eisentraut.org 147 [ + + ]: 84 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
148 : : {
149 [ + + ]: 81 : if (*publish_via_partition_root_given)
1004 dean.a.rasheed@gmail 150 : 3 : errorConflictingDefElem(defel, pstate);
1467 peter@eisentraut.org 151 : 78 : *publish_via_partition_root_given = true;
152 : 78 : *publish_via_partition_root = defGetBoolean(defel);
153 : : }
154 : : else
2529 peter_e@gmx.net 155 [ + - ]: 3 : ereport(ERROR,
156 : : (errcode(ERRCODE_SYNTAX_ERROR),
157 : : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
158 : : }
2642 159 : 382 : }
160 : :
161 : : /*
162 : : * Convert the PublicationObjSpecType list into schema oid list and
163 : : * PublicationTable list.
164 : : */
165 : : static void
900 akapila@postgresql.o 166 : 750 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
167 : : List **rels, List **schemas)
168 : : {
169 : : ListCell *cell;
170 : : PublicationObjSpec *pubobj;
171 : :
172 [ + + ]: 750 : if (!pubobjspec_list)
173 : 42 : return;
174 : :
175 [ + - + + : 1498 : foreach(cell, pubobjspec_list)
+ + ]
176 : : {
177 : : Oid schemaid;
178 : : List *search_path;
179 : :
180 : 808 : pubobj = (PublicationObjSpec *) lfirst(cell);
181 : :
182 [ + + + - ]: 808 : switch (pubobj->pubobjtype)
183 : : {
184 : 630 : case PUBLICATIONOBJ_TABLE:
738 tomas.vondra@postgre 185 : 630 : *rels = lappend(*rels, pubobj->pubtable);
900 akapila@postgresql.o 186 : 630 : break;
836 alvherre@alvh.no-ip. 187 : 166 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
900 akapila@postgresql.o 188 : 166 : schemaid = get_namespace_oid(pubobj->name, false);
189 : :
190 : : /* Filter out duplicates if user specifies "sch1, sch1" */
738 tomas.vondra@postgre 191 : 151 : *schemas = list_append_unique_oid(*schemas, schemaid);
900 akapila@postgresql.o 192 : 151 : break;
836 alvherre@alvh.no-ip. 193 : 12 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
900 akapila@postgresql.o 194 : 12 : search_path = fetch_search_path(false);
195 [ + + ]: 12 : if (search_path == NIL) /* nothing valid in search_path? */
196 [ + - ]: 3 : ereport(ERROR,
197 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
198 : : errmsg("no schema has been selected for CURRENT_SCHEMA"));
199 : :
200 : 9 : schemaid = linitial_oid(search_path);
201 : 9 : list_free(search_path);
202 : :
203 : : /* Filter out duplicates if user specifies "sch1, sch1" */
738 tomas.vondra@postgre 204 : 9 : *schemas = list_append_unique_oid(*schemas, schemaid);
900 akapila@postgresql.o 205 : 9 : break;
900 akapila@postgresql.o 206 :UBC 0 : default:
207 : : /* shouldn't happen */
208 [ # # ]: 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
209 : : break;
210 : : }
211 : : }
212 : : }
213 : :
214 : : /*
215 : : * Returns true if any of the columns used in the row filter WHERE expression is
216 : : * not part of REPLICA IDENTITY, false otherwise.
217 : : */
218 : : static bool
782 akapila@postgresql.o 219 :CBC 129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
220 : : {
221 [ - + ]: 129 : if (node == NULL)
782 akapila@postgresql.o 222 :UBC 0 : return false;
223 : :
782 akapila@postgresql.o 224 [ + + ]:CBC 129 : if (IsA(node, Var))
225 : : {
226 : 52 : Var *var = (Var *) node;
227 : 52 : AttrNumber attnum = var->varattno;
228 : :
229 : : /*
230 : : * If pubviaroot is true, we are validating the row filter of the
231 : : * parent table, but the bitmap contains the replica identity
232 : : * information of the child table. So, get the column number of the
233 : : * child table as parent and child column order could be different.
234 : : */
235 [ + + ]: 52 : if (context->pubviaroot)
236 : : {
237 : 8 : char *colname = get_attname(context->parentid, attnum, false);
238 : :
239 : 8 : attnum = get_attnum(context->relid, colname);
240 : : }
241 : :
242 [ + + ]: 52 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
243 : 52 : context->bms_replident))
244 : 30 : return true;
245 : : }
246 : :
247 : 99 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
248 : : (void *) context);
249 : : }
250 : :
251 : : /*
252 : : * Check if all columns referenced in the filter expression are part of the
253 : : * REPLICA IDENTITY index or not.
254 : : *
255 : : * Returns true if any invalid column is found.
256 : : */
257 : : bool
750 tomas.vondra@postgre 258 : 331 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
259 : : bool pubviaroot)
260 : : {
261 : : HeapTuple rftuple;
782 akapila@postgresql.o 262 : 331 : Oid relid = RelationGetRelid(relation);
263 : 331 : Oid publish_as_relid = RelationGetRelid(relation);
264 : 331 : bool result = false;
265 : : Datum rfdatum;
266 : : bool rfisnull;
267 : :
268 : : /*
269 : : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
270 : : * allowed in the row filter and we can skip the validation.
271 : : */
272 [ + + ]: 331 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
273 : 78 : return false;
274 : :
275 : : /*
276 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
277 : : * is published via this publication as we need to use its row filter
278 : : * expression to filter the partition's changes.
279 : : *
280 : : * Note that even though the row filter used is for an ancestor, the
281 : : * REPLICA IDENTITY used will be for the actual child table.
282 : : */
283 [ + + + + ]: 253 : if (pubviaroot && relation->rd_rel->relispartition)
284 : : {
285 : : publish_as_relid
760 tomas.vondra@postgre 286 : 53 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
287 : :
782 akapila@postgresql.o 288 [ + + ]: 53 : if (!OidIsValid(publish_as_relid))
289 : 3 : publish_as_relid = relid;
290 : : }
291 : :
292 : 253 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
293 : : ObjectIdGetDatum(publish_as_relid),
294 : : ObjectIdGetDatum(pubid));
295 : :
296 [ + + ]: 253 : if (!HeapTupleIsValid(rftuple))
297 : 28 : return false;
298 : :
299 : 225 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
300 : : Anum_pg_publication_rel_prqual,
301 : : &rfisnull);
302 : :
303 [ + + ]: 225 : if (!rfisnull)
304 : : {
305 : 51 : rf_context context = {0};
306 : : Node *rfnode;
307 : 51 : Bitmapset *bms = NULL;
308 : :
309 : 51 : context.pubviaroot = pubviaroot;
310 : 51 : context.parentid = publish_as_relid;
311 : 51 : context.relid = relid;
312 : :
313 : : /* Remember columns that are part of the REPLICA IDENTITY */
314 : 51 : bms = RelationGetIndexAttrBitmap(relation,
315 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
316 : :
317 : 51 : context.bms_replident = bms;
318 : 51 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
319 : 51 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
320 : : }
321 : :
322 : 225 : ReleaseSysCache(rftuple);
323 : :
324 : 225 : return result;
325 : : }
326 : :
327 : : /*
328 : : * Check if all columns referenced in the REPLICA IDENTITY are covered by
329 : : * the column list.
330 : : *
331 : : * Returns true if any replica identity column is not covered by column list.
332 : : */
333 : : bool
750 tomas.vondra@postgre 334 : 331 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
335 : : bool pubviaroot)
336 : : {
337 : : HeapTuple tuple;
338 : 331 : Oid relid = RelationGetRelid(relation);
339 : 331 : Oid publish_as_relid = RelationGetRelid(relation);
340 : 331 : bool result = false;
341 : : Datum datum;
342 : : bool isnull;
343 : :
344 : : /*
345 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
346 : : * is published via this publication as we need to use its column list for
347 : : * the changes.
348 : : *
349 : : * Note that even though the column list used is for an ancestor, the
350 : : * REPLICA IDENTITY used will be for the actual child table.
351 : : */
352 [ + + + + ]: 331 : if (pubviaroot && relation->rd_rel->relispartition)
353 : : {
354 : 64 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
355 : :
356 [ + + ]: 64 : if (!OidIsValid(publish_as_relid))
357 : 3 : publish_as_relid = relid;
358 : : }
359 : :
360 : 331 : tuple = SearchSysCache2(PUBLICATIONRELMAP,
361 : : ObjectIdGetDatum(publish_as_relid),
362 : : ObjectIdGetDatum(pubid));
363 : :
364 [ + + ]: 331 : if (!HeapTupleIsValid(tuple))
365 : 32 : return false;
366 : :
367 : 299 : datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
368 : : Anum_pg_publication_rel_prattrs,
369 : : &isnull);
370 : :
371 [ + + ]: 299 : if (!isnull)
372 : : {
373 : : int x;
374 : : Bitmapset *idattrs;
375 : 115 : Bitmapset *columns = NULL;
376 : :
377 : : /* With REPLICA IDENTITY FULL, no column list is allowed. */
378 [ + + ]: 115 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
379 : 18 : result = true;
380 : :
381 : : /* Transform the column list datum to a bitmapset. */
382 : 115 : columns = pub_collist_to_bitmapset(NULL, datum, NULL);
383 : :
384 : : /* Remember columns that are part of the REPLICA IDENTITY */
385 : 115 : idattrs = RelationGetIndexAttrBitmap(relation,
386 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
387 : :
388 : : /*
389 : : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
390 : : * offset (to handle system columns the usual way), while column list
391 : : * does not use offset, so we can't do bms_is_subset(). Instead, we
392 : : * have to loop over the idattrs and check all of them are in the
393 : : * list.
394 : : */
395 : 115 : x = -1;
396 [ + + ]: 175 : while ((x = bms_next_member(idattrs, x)) >= 0)
397 : : {
398 : 96 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
399 : :
400 : : /*
401 : : * If pubviaroot is true, we are validating the column list of the
402 : : * parent table, but the bitmap contains the replica identity
403 : : * information of the child table. The parent/child attnums may
404 : : * not match, so translate them to the parent - get the attname
405 : : * from the child, and look it up in the parent.
406 : : */
407 [ + + ]: 96 : if (pubviaroot)
408 : : {
409 : : /* attribute name in the child table */
703 tgl@sss.pgh.pa.us 410 : 40 : char *colname = get_attname(relid, attnum, false);
411 : :
412 : : /*
413 : : * Determine the attnum for the attribute name in parent (we
414 : : * are using the column list defined on the parent).
415 : : */
750 tomas.vondra@postgre 416 : 40 : attnum = get_attnum(publish_as_relid, colname);
417 : : }
418 : :
419 : : /* replica identity column, not covered by the column list */
420 [ + + ]: 96 : if (!bms_is_member(attnum, columns))
421 : : {
422 : 36 : result = true;
423 : 36 : break;
424 : : }
425 : : }
426 : :
427 : 115 : bms_free(idattrs);
428 : 115 : bms_free(columns);
429 : : }
430 : :
431 : 299 : ReleaseSysCache(tuple);
432 : :
433 : 299 : return result;
434 : : }
435 : :
436 : : /* check_functions_in_node callback */
437 : : static bool
782 akapila@postgresql.o 438 : 198 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
439 : : {
440 [ + + - + ]: 198 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
441 : : func_id >= FirstNormalObjectId);
442 : : }
443 : :
444 : : /*
445 : : * The row filter walker checks if the row filter expression is a "simple
446 : : * expression".
447 : : *
448 : : * It allows only simple or compound expressions such as:
449 : : * - (Var Op Const)
450 : : * - (Var Op Var)
451 : : * - (Var Op Const) AND/OR (Var Op Const)
452 : : * - etc
453 : : * (where Var is a column of the table this filter belongs to)
454 : : *
455 : : * The simple expression has the following restrictions:
456 : : * - User-defined operators are not allowed;
457 : : * - User-defined functions are not allowed;
458 : : * - User-defined types are not allowed;
459 : : * - User-defined collations are not allowed;
460 : : * - Non-immutable built-in functions are not allowed;
461 : : * - System columns are not allowed.
462 : : *
463 : : * NOTES
464 : : *
465 : : * We don't allow user-defined functions/operators/types/collations because
466 : : * (a) if a user drops a user-defined object used in a row filter expression or
467 : : * if there is any other error while using it, the logical decoding
468 : : * infrastructure won't be able to recover from such an error even if the
469 : : * object is recreated again because a historic snapshot is used to evaluate
470 : : * the row filter;
471 : : * (b) a user-defined function can be used to access tables that could have
472 : : * unpleasant results because a historic snapshot is used. That's why only
473 : : * immutable built-in functions are allowed in row filter expressions.
474 : : *
475 : : * We don't allow system columns because currently, we don't have that
476 : : * information in the tuple passed to downstream. Also, as we don't replicate
477 : : * those to subscribers, there doesn't seem to be a need for a filter on those
478 : : * columns.
479 : : *
480 : : * We can allow other node types after more analysis and testing.
481 : : */
482 : : static bool
483 : 664 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
484 : : {
485 : 664 : char *errdetail_msg = NULL;
486 : :
487 [ + + ]: 664 : if (node == NULL)
488 : 3 : return false;
489 : :
490 [ + + + + : 661 : switch (nodeTag(node))
+ + ]
491 : : {
492 : 178 : case T_Var:
493 : : /* System columns are not allowed. */
494 [ + + ]: 178 : if (((Var *) node)->varattno < InvalidAttrNumber)
495 : 3 : errdetail_msg = _("System columns are not allowed.");
496 : 178 : break;
497 : 176 : case T_OpExpr:
498 : : case T_DistinctExpr:
499 : : case T_NullIfExpr:
500 : : /* OK, except user-defined operators are not allowed. */
501 [ + + ]: 176 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
502 : 3 : errdetail_msg = _("User-defined operators are not allowed.");
503 : 176 : break;
504 : 3 : case T_ScalarArrayOpExpr:
505 : : /* OK, except user-defined operators are not allowed. */
506 [ - + ]: 3 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
782 akapila@postgresql.o 507 :UBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
508 : :
509 : : /*
510 : : * We don't need to check the hashfuncid and negfuncid of
511 : : * ScalarArrayOpExpr as those functions are only built for a
512 : : * subquery.
513 : : */
782 akapila@postgresql.o 514 :CBC 3 : break;
515 : 3 : case T_RowCompareExpr:
516 : : {
517 : : ListCell *opid;
518 : :
519 : : /* OK, except user-defined operators are not allowed. */
520 [ + - + + : 9 : foreach(opid, ((RowCompareExpr *) node)->opnos)
+ + ]
521 : : {
522 [ - + ]: 6 : if (lfirst_oid(opid) >= FirstNormalObjectId)
523 : : {
782 akapila@postgresql.o 524 :UBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
525 : 0 : break;
526 : : }
527 : : }
528 : : }
782 akapila@postgresql.o 529 :CBC 3 : break;
530 : 298 : case T_Const:
531 : : case T_FuncExpr:
532 : : case T_BoolExpr:
533 : : case T_RelabelType:
534 : : case T_CollateExpr:
535 : : case T_CaseExpr:
536 : : case T_CaseTestExpr:
537 : : case T_ArrayExpr:
538 : : case T_RowExpr:
539 : : case T_CoalesceExpr:
540 : : case T_MinMaxExpr:
541 : : case T_XmlExpr:
542 : : case T_NullTest:
543 : : case T_BooleanTest:
544 : : case T_List:
545 : : /* OK, supported */
546 : 298 : break;
547 : 3 : default:
568 peter@eisentraut.org 548 : 3 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
782 akapila@postgresql.o 549 : 3 : break;
550 : : }
551 : :
552 : : /*
553 : : * For all the supported nodes, if we haven't already found a problem,
554 : : * check the types, functions, and collations used in it. We check List
555 : : * by walking through each element.
556 : : */
564 alvherre@alvh.no-ip. 557 [ + + + + ]: 661 : if (!errdetail_msg && !IsA(node, List))
558 : : {
559 [ + + ]: 625 : if (exprType(node) >= FirstNormalObjectId)
560 : 3 : errdetail_msg = _("User-defined types are not allowed.");
561 [ + + ]: 622 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
562 : : (void *) pstate))
563 : 6 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
564 [ + - + + ]: 1232 : else if (exprCollation(node) >= FirstNormalObjectId ||
565 : 616 : exprInputCollation(node) >= FirstNormalObjectId)
566 : 3 : errdetail_msg = _("User-defined collations are not allowed.");
567 : : }
568 : :
569 : : /*
570 : : * If we found a problem in this node, throw error now. Otherwise keep
571 : : * going.
572 : : */
782 akapila@postgresql.o 573 [ + + ]: 661 : if (errdetail_msg)
574 [ + - ]: 21 : ereport(ERROR,
575 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
576 : : errmsg("invalid publication WHERE expression"),
577 : : errdetail_internal("%s", errdetail_msg),
578 : : parser_errposition(pstate, exprLocation(node))));
579 : :
580 : 640 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
581 : : (void *) pstate);
582 : : }
583 : :
584 : : /*
585 : : * Check if the row filter expression is a "simple expression".
586 : : *
587 : : * See check_simple_rowfilter_expr_walker for details.
588 : : */
589 : : static bool
590 : 172 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
591 : : {
592 : 172 : return check_simple_rowfilter_expr_walker(node, pstate);
593 : : }
594 : :
595 : : /*
596 : : * Transform the publication WHERE expression for all the relations in the list,
597 : : * ensuring it is coerced to boolean and necessary collation information is
598 : : * added if required, and add a new nsitem/RTE for the associated relation to
599 : : * the ParseState's namespace list.
600 : : *
601 : : * Also check the publication row filter expression and throw an error if
602 : : * anything not permitted or unexpected is encountered.
603 : : */
604 : : static void
605 : 521 : TransformPubWhereClauses(List *tables, const char *queryString,
606 : : bool pubviaroot)
607 : : {
608 : : ListCell *lc;
609 : :
610 [ + + + + : 1043 : foreach(lc, tables)
+ + ]
611 : : {
612 : : ParseNamespaceItem *nsitem;
613 : 552 : Node *whereclause = NULL;
614 : : ParseState *pstate;
615 : 552 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
616 : :
617 [ + + ]: 552 : if (pri->whereClause == NULL)
618 : 371 : continue;
619 : :
620 : : /*
621 : : * If the publication doesn't publish changes via the root partitioned
622 : : * table, the partition's row filter will be used. So disallow using
623 : : * WHERE clause on partitioned table in this case.
624 : : */
625 [ + + ]: 181 : if (!pubviaroot &&
626 [ + + ]: 170 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
627 [ + - ]: 3 : ereport(ERROR,
628 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 : : errmsg("cannot use publication WHERE clause for relation \"%s\"",
630 : : RelationGetRelationName(pri->relation)),
631 : : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
632 : : "publish_via_partition_root")));
633 : :
634 : : /*
635 : : * A fresh pstate is required so that we only have "this" table in its
636 : : * rangetable
637 : : */
638 : 178 : pstate = make_parsestate(NULL);
639 : 178 : pstate->p_sourcetext = queryString;
640 : 178 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
641 : : AccessShareLock, NULL,
642 : : false, false);
643 : 178 : addNSItemToQuery(pstate, nsitem, false, true, true);
644 : :
645 : 178 : whereclause = transformWhereClause(pstate,
646 : 178 : copyObject(pri->whereClause),
647 : : EXPR_KIND_WHERE,
648 : : "PUBLICATION WHERE");
649 : :
650 : : /* Fix up collation information */
651 : 172 : assign_expr_collations(pstate, whereclause);
652 : :
653 : : /*
654 : : * We allow only simple expressions in row filters. See
655 : : * check_simple_rowfilter_expr_walker.
656 : : */
657 : 172 : check_simple_rowfilter_expr(whereclause, pstate);
658 : :
659 : 151 : free_parsestate(pstate);
660 : :
661 : 151 : pri->whereClause = whereclause;
662 : : }
663 : 491 : }
664 : :
665 : :
666 : : /*
667 : : * Given a list of tables that are going to be added to a publication,
668 : : * verify that they fulfill the necessary preconditions, namely: no tables
669 : : * have a column list if any schema is published; and partitioned tables do
670 : : * not have column lists if publish_via_partition_root is not set.
671 : : *
672 : : * 'publish_schema' indicates that the publication contains any TABLES IN
673 : : * SCHEMA elements (newly added in this command, or preexisting).
674 : : * 'pubviaroot' is the value of publish_via_partition_root.
675 : : */
676 : : static void
565 alvherre@alvh.no-ip. 677 : 491 : CheckPubRelationColumnList(char *pubname, List *tables,
678 : : bool publish_schema, bool pubviaroot)
679 : : {
680 : : ListCell *lc;
681 : :
750 tomas.vondra@postgre 682 [ + + + + : 998 : foreach(lc, tables)
+ + ]
683 : : {
684 : 522 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
685 : :
686 [ + + ]: 522 : if (pri->columns == NIL)
687 : 359 : continue;
688 : :
689 : : /*
690 : : * Disallow specifying column list if any schema is in the
691 : : * publication.
692 : : *
693 : : * XXX We could instead just forbid the case when the publication
694 : : * tries to publish the table with a column list and a schema for that
695 : : * table. However, if we do that then we need a restriction during
696 : : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
697 : : * seem to be a good idea.
698 : : */
569 akapila@postgresql.o 699 [ + + ]: 163 : if (publish_schema)
700 [ + - ]: 12 : ereport(ERROR,
701 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
702 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
703 : : get_namespace_name(RelationGetNamespace(pri->relation)),
704 : : RelationGetRelationName(pri->relation), pubname),
705 : : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
706 : :
707 : : /*
708 : : * If the publication doesn't publish changes via the root partitioned
709 : : * table, the partition's column list will be used. So disallow using
710 : : * a column list on the partitioned table in this case.
711 : : */
750 tomas.vondra@postgre 712 [ + + ]: 151 : if (!pubviaroot &&
713 [ + + ]: 113 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
714 [ + - ]: 3 : ereport(ERROR,
715 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
716 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
717 : : get_namespace_name(RelationGetNamespace(pri->relation)),
718 : : RelationGetRelationName(pri->relation), pubname),
719 : : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
720 : : "publish_via_partition_root")));
721 : : }
722 : 476 : }
723 : :
724 : : /*
725 : : * Create new publication.
726 : : */
727 : : ObjectAddress
1004 dean.a.rasheed@gmail 728 : 342 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
729 : : {
730 : : Relation rel;
731 : : ObjectAddress myself;
732 : : Oid puboid;
733 : : bool nulls[Natts_pg_publication];
734 : : Datum values[Natts_pg_publication];
735 : : HeapTuple tup;
736 : : bool publish_given;
737 : : PublicationActions pubactions;
738 : : bool publish_via_partition_root_given;
739 : : bool publish_via_partition_root;
740 : : AclResult aclresult;
738 tomas.vondra@postgre 741 : 342 : List *relations = NIL;
742 : 342 : List *schemaidlist = NIL;
743 : :
744 : : /* must have CREATE privilege on database */
518 peter@eisentraut.org 745 : 342 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
2642 peter_e@gmx.net 746 [ + + ]: 342 : if (aclresult != ACLCHECK_OK)
2325 747 : 3 : aclcheck_error(aclresult, OBJECT_DATABASE,
2642 748 : 3 : get_database_name(MyDatabaseId));
749 : :
750 : : /* FOR ALL TABLES requires superuser */
738 tomas.vondra@postgre 751 [ + + - + ]: 339 : if (stmt->for_all_tables && !superuser())
2642 peter_e@gmx.net 752 [ # # ]:UBC 0 : ereport(ERROR,
753 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
754 : : errmsg("must be superuser to create FOR ALL TABLES publication")));
755 : :
1910 andres@anarazel.de 756 :CBC 339 : rel = table_open(PublicationRelationId, RowExclusiveLock);
757 : :
758 : : /* Check if name is used */
1972 759 : 339 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
760 : : CStringGetDatum(stmt->pubname));
2642 peter_e@gmx.net 761 [ + + ]: 339 : if (OidIsValid(puboid))
762 [ + - ]: 3 : ereport(ERROR,
763 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
764 : : errmsg("publication \"%s\" already exists",
765 : : stmt->pubname)));
766 : :
767 : : /* Form a tuple. */
768 : 336 : memset(values, 0, sizeof(values));
769 : 336 : memset(nulls, false, sizeof(nulls));
770 : :
771 : 336 : values[Anum_pg_publication_pubname - 1] =
772 : 336 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
773 : 336 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
774 : :
1004 dean.a.rasheed@gmail 775 : 336 : parse_publication_options(pstate,
776 : : stmt->options,
777 : : &publish_given, &pubactions,
778 : : &publish_via_partition_root_given,
779 : : &publish_via_partition_root);
780 : :
1972 andres@anarazel.de 781 : 327 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
782 : : Anum_pg_publication_oid);
783 : 327 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
2642 peter_e@gmx.net 784 : 327 : values[Anum_pg_publication_puballtables - 1] =
738 tomas.vondra@postgre 785 : 327 : BoolGetDatum(stmt->for_all_tables);
2642 peter_e@gmx.net 786 : 327 : values[Anum_pg_publication_pubinsert - 1] =
1467 peter@eisentraut.org 787 : 327 : BoolGetDatum(pubactions.pubinsert);
2642 peter_e@gmx.net 788 : 327 : values[Anum_pg_publication_pubupdate - 1] =
1467 peter@eisentraut.org 789 : 327 : BoolGetDatum(pubactions.pubupdate);
2642 peter_e@gmx.net 790 : 327 : values[Anum_pg_publication_pubdelete - 1] =
1467 peter@eisentraut.org 791 : 327 : BoolGetDatum(pubactions.pubdelete);
2199 peter_e@gmx.net 792 : 327 : values[Anum_pg_publication_pubtruncate - 1] =
1467 peter@eisentraut.org 793 : 327 : BoolGetDatum(pubactions.pubtruncate);
794 : 327 : values[Anum_pg_publication_pubviaroot - 1] =
795 : 327 : BoolGetDatum(publish_via_partition_root);
796 : :
2642 peter_e@gmx.net 797 : 327 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
798 : :
799 : : /* Insert tuple into catalog. */
1972 andres@anarazel.de 800 : 327 : CatalogTupleInsert(rel, tup);
2642 peter_e@gmx.net 801 : 327 : heap_freetuple(tup);
802 : :
2641 alvherre@alvh.no-ip. 803 : 327 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
804 : :
2642 peter_e@gmx.net 805 : 327 : ObjectAddressSet(myself, PublicationRelationId, puboid);
806 : :
807 : : /* Make the changes visible. */
808 : 327 : CommandCounterIncrement();
809 : :
810 : : /* Associate objects with the publication. */
738 tomas.vondra@postgre 811 [ + + ]: 327 : if (stmt->for_all_tables)
812 : : {
813 : : /* Invalidate relcache so that publication info is rebuilt. */
949 akapila@postgresql.o 814 : 31 : CacheInvalidateRelcacheAll();
815 : : }
816 : : else
817 : : {
738 tomas.vondra@postgre 818 : 296 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
819 : : &schemaidlist);
820 : :
821 : : /* FOR TABLES IN SCHEMA requires superuser */
606 tgl@sss.pgh.pa.us 822 [ + + + + ]: 287 : if (schemaidlist != NIL && !superuser())
900 akapila@postgresql.o 823 [ + - ]: 3 : ereport(ERROR,
824 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
825 : : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
826 : :
606 tgl@sss.pgh.pa.us 827 [ + + ]: 284 : if (relations != NIL)
828 : : {
829 : : List *rels;
830 : :
738 tomas.vondra@postgre 831 : 188 : rels = OpenTableList(relations);
782 akapila@postgresql.o 832 : 176 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
833 : : publish_via_partition_root);
834 : :
565 alvherre@alvh.no-ip. 835 : 164 : CheckPubRelationColumnList(stmt->pubname, rels,
836 : : schemaidlist != NIL,
837 : : publish_via_partition_root);
838 : :
738 tomas.vondra@postgre 839 : 161 : PublicationAddTables(puboid, rels, true, NULL);
840 : 148 : CloseTableList(rels);
841 : : }
842 : :
606 tgl@sss.pgh.pa.us 843 [ + + ]: 244 : if (schemaidlist != NIL)
844 : : {
845 : : /*
846 : : * Schema lock is held until the publication is created to prevent
847 : : * concurrent schema deletion.
848 : : */
738 tomas.vondra@postgre 849 : 67 : LockSchemaList(schemaidlist);
850 : 67 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
851 : : }
852 : : }
853 : :
1910 andres@anarazel.de 854 : 272 : table_close(rel, RowExclusiveLock);
855 : :
2642 peter_e@gmx.net 856 [ - + ]: 272 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
857 : :
1737 tmunro@postgresql.or 858 [ + + ]: 272 : if (wal_level != WAL_LEVEL_LOGICAL)
859 [ + - ]: 153 : ereport(WARNING,
860 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
861 : : errmsg("wal_level is insufficient to publish logical changes"),
862 : : errhint("Set wal_level to \"logical\" before creating subscriptions.")));
863 : :
2642 peter_e@gmx.net 864 : 272 : return myself;
865 : : }
866 : :
867 : : /*
868 : : * Change options of a publication.
869 : : */
870 : : static void
1004 dean.a.rasheed@gmail 871 : 55 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
872 : : Relation rel, HeapTuple tup)
873 : : {
874 : : bool nulls[Natts_pg_publication];
875 : : bool replaces[Natts_pg_publication];
876 : : Datum values[Natts_pg_publication];
877 : : bool publish_given;
878 : : PublicationActions pubactions;
879 : : bool publish_via_partition_root_given;
880 : : bool publish_via_partition_root;
881 : : ObjectAddress obj;
882 : : Form_pg_publication pubform;
782 akapila@postgresql.o 883 : 55 : List *root_relids = NIL;
884 : : ListCell *lc;
885 : :
1004 dean.a.rasheed@gmail 886 : 55 : parse_publication_options(pstate,
887 : : stmt->options,
888 : : &publish_given, &pubactions,
889 : : &publish_via_partition_root_given,
890 : : &publish_via_partition_root);
891 : :
782 akapila@postgresql.o 892 : 55 : pubform = (Form_pg_publication) GETSTRUCT(tup);
893 : :
894 : : /*
895 : : * If the publication doesn't publish changes via the root partitioned
896 : : * table, the partition's row filter and column list will be used. So
897 : : * disallow using WHERE clause and column lists on partitioned table in
898 : : * this case.
899 : : */
900 [ + + + + ]: 55 : if (!pubform->puballtables && publish_via_partition_root_given &&
901 [ + + ]: 40 : !publish_via_partition_root)
902 : : {
903 : : /*
904 : : * Lock the publication so nobody else can do anything with it. This
905 : : * prevents concurrent alter to add partitioned table(s) with WHERE
906 : : * clause(s) and/or column lists which we don't allow when not
907 : : * publishing via root.
908 : : */
909 : 24 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
910 : : AccessShareLock);
911 : :
912 : 24 : root_relids = GetPublicationRelations(pubform->oid,
913 : : PUBLICATION_PART_ROOT);
914 : :
915 [ + - + + : 42 : foreach(lc, root_relids)
+ + ]
916 : : {
917 : 24 : Oid relid = lfirst_oid(lc);
918 : : HeapTuple rftuple;
919 : : char relkind;
920 : : char *relname;
921 : : bool has_rowfilter;
922 : : bool has_collist;
923 : :
924 : : /*
925 : : * Beware: we don't have lock on the relations, so cope silently
926 : : * with the cache lookups returning NULL.
927 : : */
928 : :
929 : 24 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
930 : : ObjectIdGetDatum(relid),
931 : : ObjectIdGetDatum(pubform->oid));
732 alvherre@alvh.no-ip. 932 [ - + ]: 24 : if (!HeapTupleIsValid(rftuple))
732 alvherre@alvh.no-ip. 933 :UBC 0 : continue;
732 alvherre@alvh.no-ip. 934 :CBC 24 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
935 : 24 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
936 [ + + + + ]: 24 : if (!has_rowfilter && !has_collist)
937 : : {
938 : 6 : ReleaseSysCache(rftuple);
939 : 6 : continue;
940 : : }
941 : :
942 : 18 : relkind = get_rel_relkind(relid);
943 [ + + ]: 18 : if (relkind != RELKIND_PARTITIONED_TABLE)
944 : : {
945 : 12 : ReleaseSysCache(rftuple);
946 : 12 : continue;
947 : : }
948 : 6 : relname = get_rel_name(relid);
949 [ - + ]: 6 : if (relname == NULL) /* table concurrently dropped */
950 : : {
782 akapila@postgresql.o 951 :UBC 0 : ReleaseSysCache(rftuple);
732 alvherre@alvh.no-ip. 952 : 0 : continue;
953 : : }
954 : :
732 alvherre@alvh.no-ip. 955 [ + + ]:CBC 6 : if (has_rowfilter)
956 [ + - ]: 3 : ereport(ERROR,
957 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
959 : : "publish_via_partition_root",
960 : : stmt->pubname),
961 : : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
962 : : relname, "publish_via_partition_root")));
963 [ - + ]: 3 : Assert(has_collist);
964 [ + - ]: 3 : ereport(ERROR,
965 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
966 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
967 : : "publish_via_partition_root",
968 : : stmt->pubname),
969 : : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
970 : : relname, "publish_via_partition_root")));
971 : : }
972 : : }
973 : :
974 : : /* Everything ok, form a new tuple. */
2642 peter_e@gmx.net 975 : 49 : memset(values, 0, sizeof(values));
976 : 49 : memset(nulls, false, sizeof(nulls));
977 : 49 : memset(replaces, false, sizeof(replaces));
978 : :
2529 979 [ + + ]: 49 : if (publish_given)
980 : : {
1467 peter@eisentraut.org 981 : 14 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
2642 peter_e@gmx.net 982 : 14 : replaces[Anum_pg_publication_pubinsert - 1] = true;
983 : :
1467 peter@eisentraut.org 984 : 14 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
2642 peter_e@gmx.net 985 : 14 : replaces[Anum_pg_publication_pubupdate - 1] = true;
986 : :
1467 peter@eisentraut.org 987 : 14 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
2642 peter_e@gmx.net 988 : 14 : replaces[Anum_pg_publication_pubdelete - 1] = true;
989 : :
1467 peter@eisentraut.org 990 : 14 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
2199 peter_e@gmx.net 991 : 14 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
992 : : }
993 : :
1467 peter@eisentraut.org 994 [ + + ]: 49 : if (publish_via_partition_root_given)
995 : : {
996 : 35 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
997 : 35 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
998 : : }
999 : :
2642 peter_e@gmx.net 1000 : 49 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1001 : : replaces);
1002 : :
1003 : : /* Update the catalog. */
2630 alvherre@alvh.no-ip. 1004 : 49 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1005 : :
2642 peter_e@gmx.net 1006 : 49 : CommandCounterIncrement();
1007 : :
1972 andres@anarazel.de 1008 : 49 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1009 : :
1010 : : /* Invalidate the relcache. */
738 tomas.vondra@postgre 1011 [ + + ]: 49 : if (pubform->puballtables)
1012 : : {
2642 peter_e@gmx.net 1013 : 4 : CacheInvalidateRelcacheAll();
1014 : : }
1015 : : else
1016 : : {
900 akapila@postgresql.o 1017 : 45 : List *relids = NIL;
1018 : 45 : List *schemarelids = NIL;
1019 : :
1020 : : /*
1021 : : * For any partitioned tables contained in the publication, we must
1022 : : * invalidate all partitions contained in the respective partition
1023 : : * trees, not just those explicitly mentioned in the publication.
1024 : : */
782 1025 [ + + ]: 45 : if (root_relids == NIL)
1026 : 27 : relids = GetPublicationRelations(pubform->oid,
1027 : : PUBLICATION_PART_ALL);
1028 : : else
1029 : : {
1030 : : /*
1031 : : * We already got tables explicitly mentioned in the publication.
1032 : : * Now get all partitions for the partitioned table in the list.
1033 : : */
1034 [ + - + + : 36 : foreach(lc, root_relids)
+ + ]
1035 : 18 : relids = GetPubPartitionOptionRelations(relids,
1036 : : PUBLICATION_PART_ALL,
1037 : : lfirst_oid(lc));
1038 : : }
1039 : :
900 1040 : 45 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1041 : : PUBLICATION_PART_ALL);
1042 : 45 : relids = list_concat_unique_oid(relids, schemarelids);
1043 : :
935 1044 : 45 : InvalidatePublicationRels(relids);
1045 : : }
1046 : :
1972 andres@anarazel.de 1047 : 49 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
2642 peter_e@gmx.net 1048 : 49 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1049 : : (Node *) stmt);
1050 : :
1972 andres@anarazel.de 1051 [ - + ]: 49 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
2642 peter_e@gmx.net 1052 : 49 : }
1053 : :
1054 : : /*
1055 : : * Invalidate the relations.
1056 : : */
1057 : : void
935 akapila@postgresql.o 1058 : 1088 : InvalidatePublicationRels(List *relids)
1059 : : {
1060 : : /*
1061 : : * We don't want to send too many individual messages, at some point it's
1062 : : * cheaper to just reset whole relcache.
1063 : : */
1064 [ + - ]: 1088 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1065 : : {
1066 : : ListCell *lc;
1067 : :
1068 [ + + + + : 10438 : foreach(lc, relids)
+ + ]
1069 : 9350 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1070 : : }
1071 : : else
935 akapila@postgresql.o 1072 :UBC 0 : CacheInvalidateRelcacheAll();
935 akapila@postgresql.o 1073 :CBC 1088 : }
1074 : :
1075 : : /*
1076 : : * Add or remove table to/from publication.
1077 : : */
1078 : : static void
900 1079 : 424 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1080 : : List *tables, const char *queryString,
1081 : : bool publish_schema)
1082 : : {
2642 peter_e@gmx.net 1083 : 424 : List *rels = NIL;
1084 : 424 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1972 andres@anarazel.de 1085 : 424 : Oid pubid = pubform->oid;
1086 : :
1087 : : /*
1088 : : * Nothing to do if no objects, except in SET: for that it is quite
1089 : : * possible that user has not specified any tables in which case we need
1090 : : * to remove all the existing tables.
1091 : : */
832 alvherre@alvh.no-ip. 1092 [ + + + + ]: 424 : if (!tables && stmt->action != AP_SetObjects)
900 akapila@postgresql.o 1093 : 33 : return;
1094 : :
738 tomas.vondra@postgre 1095 : 391 : rels = OpenTableList(tables);
1096 : :
832 alvherre@alvh.no-ip. 1097 [ + + ]: 391 : if (stmt->action == AP_AddObjects)
1098 : : {
782 akapila@postgresql.o 1099 : 134 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1100 : :
569 1101 : 125 : publish_schema |= is_schema_publication(pubid);
1102 : :
565 alvherre@alvh.no-ip. 1103 : 125 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
569 akapila@postgresql.o 1104 : 125 : pubform->pubviaroot);
1105 : :
738 tomas.vondra@postgre 1106 : 119 : PublicationAddTables(pubid, rels, false, stmt);
1107 : : }
832 alvherre@alvh.no-ip. 1108 [ + + ]: 257 : else if (stmt->action == AP_DropObjects)
738 tomas.vondra@postgre 1109 : 46 : PublicationDropTables(pubid, rels, false);
1110 : : else /* AP_SetObjects */
1111 : : {
1431 tgl@sss.pgh.pa.us 1112 : 211 : List *oldrelids = GetPublicationRelations(pubid,
1113 : : PUBLICATION_PART_ROOT);
2642 peter_e@gmx.net 1114 : 211 : List *delrels = NIL;
1115 : : ListCell *oldlc;
1116 : :
782 akapila@postgresql.o 1117 : 211 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1118 : :
565 alvherre@alvh.no-ip. 1119 : 202 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
569 akapila@postgresql.o 1120 : 202 : pubform->pubviaroot);
1121 : :
1122 : : /*
1123 : : * To recreate the relation list for the publication, look for
1124 : : * existing relations that do not need to be dropped.
1125 : : */
2642 peter_e@gmx.net 1126 [ + + + + : 385 : foreach(oldlc, oldrelids)
+ + ]
1127 : : {
1128 : 189 : Oid oldrelid = lfirst_oid(oldlc);
1129 : : ListCell *newlc;
1130 : : PublicationRelInfo *oldrel;
1131 : 189 : bool found = false;
1132 : : HeapTuple rftuple;
782 akapila@postgresql.o 1133 : 189 : Node *oldrelwhereclause = NULL;
750 tomas.vondra@postgre 1134 : 189 : Bitmapset *oldcolumns = NULL;
1135 : :
1136 : : /* look up the cache for the old relmap */
782 akapila@postgresql.o 1137 : 189 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1138 : : ObjectIdGetDatum(oldrelid),
1139 : : ObjectIdGetDatum(pubid));
1140 : :
1141 : : /*
1142 : : * See if the existing relation currently has a WHERE clause or a
1143 : : * column list. We need to compare those too.
1144 : : */
1145 [ + - ]: 189 : if (HeapTupleIsValid(rftuple))
1146 : : {
750 tomas.vondra@postgre 1147 : 189 : bool isnull = true;
1148 : : Datum whereClauseDatum;
1149 : : Datum columnListDatum;
1150 : :
1151 : : /* Load the WHERE clause for this table. */
782 akapila@postgresql.o 1152 : 189 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1153 : : Anum_pg_publication_rel_prqual,
1154 : : &isnull);
750 tomas.vondra@postgre 1155 [ + + ]: 189 : if (!isnull)
782 akapila@postgresql.o 1156 : 105 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1157 : :
1158 : : /* Transform the int2vector column list to a bitmap. */
750 tomas.vondra@postgre 1159 : 189 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1160 : : Anum_pg_publication_rel_prattrs,
1161 : : &isnull);
1162 : :
1163 [ + + ]: 189 : if (!isnull)
1164 : 62 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1165 : :
782 akapila@postgresql.o 1166 : 189 : ReleaseSysCache(rftuple);
1167 : : }
1168 : :
2642 peter_e@gmx.net 1169 [ + + + + : 371 : foreach(newlc, rels)
+ + ]
1170 : : {
1171 : : PublicationRelInfo *newpubrel;
1172 : : Oid newrelid;
703 tgl@sss.pgh.pa.us 1173 : 190 : Bitmapset *newcolumns = NULL;
1174 : :
951 alvherre@alvh.no-ip. 1175 : 190 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
750 tomas.vondra@postgre 1176 : 190 : newrelid = RelationGetRelid(newpubrel->relation);
1177 : :
1178 : : /*
1179 : : * If the new publication has column list, transform it to a
1180 : : * bitmap too.
1181 : : */
1182 [ + + ]: 190 : if (newpubrel->columns)
1183 : : {
1184 : : ListCell *lc;
1185 : :
1186 [ + - + + : 177 : foreach(lc, newpubrel->columns)
+ + ]
1187 : : {
1188 : 109 : char *colname = strVal(lfirst(lc));
1189 : 109 : AttrNumber attnum = get_attnum(newrelid, colname);
1190 : :
1191 : 109 : newcolumns = bms_add_member(newcolumns, attnum);
1192 : : }
1193 : : }
1194 : :
1195 : : /*
1196 : : * Check if any of the new set of relations matches with the
1197 : : * existing relations in the publication. Additionally, if the
1198 : : * relation has an associated WHERE clause, check the WHERE
1199 : : * expressions also match. Same for the column list. Drop the
1200 : : * rest.
1201 : : */
951 alvherre@alvh.no-ip. 1202 [ + + ]: 190 : if (RelationGetRelid(newpubrel->relation) == oldrelid)
1203 : : {
750 tomas.vondra@postgre 1204 [ + + + + ]: 130 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1205 : 34 : bms_equal(oldcolumns, newcolumns))
1206 : : {
782 akapila@postgresql.o 1207 : 8 : found = true;
1208 : 8 : break;
1209 : : }
1210 : : }
1211 : : }
1212 : :
1213 : : /*
1214 : : * Add the non-matched relations to a list so that they can be
1215 : : * dropped.
1216 : : */
1217 [ + + ]: 189 : if (!found)
1218 : : {
1219 : 181 : oldrel = palloc(sizeof(PublicationRelInfo));
1220 : 181 : oldrel->whereClause = NULL;
750 tomas.vondra@postgre 1221 : 181 : oldrel->columns = NIL;
782 akapila@postgresql.o 1222 : 181 : oldrel->relation = table_open(oldrelid,
1223 : : ShareUpdateExclusiveLock);
1224 : 181 : delrels = lappend(delrels, oldrel);
1225 : : }
1226 : : }
1227 : :
1228 : : /* And drop them. */
738 tomas.vondra@postgre 1229 : 196 : PublicationDropTables(pubid, delrels, true);
1230 : :
1231 : : /*
1232 : : * Don't bother calculating the difference for adding, we'll catch and
1233 : : * skip existing ones when doing catalog update.
1234 : : */
1235 : 196 : PublicationAddTables(pubid, rels, true, stmt);
1236 : :
1237 : 196 : CloseTableList(delrels);
1238 : : }
1239 : :
1240 : 334 : CloseTableList(rels);
1241 : : }
1242 : :
1243 : : /*
1244 : : * Alter the publication schemas.
1245 : : *
1246 : : * Add or remove schemas to/from publication.
1247 : : */
1248 : : static void
900 akapila@postgresql.o 1249 : 367 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1250 : : HeapTuple tup, List *schemaidlist)
1251 : : {
1252 : 367 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1253 : :
1254 : : /*
1255 : : * Nothing to do if no objects, except in SET: for that it is quite
1256 : : * possible that user has not specified any schemas in which case we need
1257 : : * to remove all the existing schemas.
1258 : : */
832 alvherre@alvh.no-ip. 1259 [ + + + + ]: 367 : if (!schemaidlist && stmt->action != AP_SetObjects)
900 akapila@postgresql.o 1260 : 138 : return;
1261 : :
1262 : : /*
1263 : : * Schema lock is held until the publication is altered to prevent
1264 : : * concurrent schema deletion.
1265 : : */
1266 : 229 : LockSchemaList(schemaidlist);
832 alvherre@alvh.no-ip. 1267 [ + + ]: 229 : if (stmt->action == AP_AddObjects)
1268 : : {
1269 : : ListCell *lc;
1270 : : List *reloids;
1271 : :
738 tomas.vondra@postgre 1272 : 14 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1273 : :
569 akapila@postgresql.o 1274 [ + + + + : 18 : foreach(lc, reloids)
+ + ]
1275 : : {
1276 : : HeapTuple coltuple;
1277 : :
1278 : 7 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1279 : : ObjectIdGetDatum(lfirst_oid(lc)),
1280 : : ObjectIdGetDatum(pubform->oid));
1281 : :
1282 [ - + ]: 7 : if (!HeapTupleIsValid(coltuple))
569 akapila@postgresql.o 1283 :UBC 0 : continue;
1284 : :
1285 : : /*
1286 : : * Disallow adding schema if column list is already part of the
1287 : : * publication. See CheckPubRelationColumnList.
1288 : : */
569 akapila@postgresql.o 1289 [ + + ]:CBC 7 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1290 [ + - ]: 3 : ereport(ERROR,
1291 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1292 : : errmsg("cannot add schema to publication \"%s\"",
1293 : : stmt->pubname),
1294 : : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1295 : :
1296 : 4 : ReleaseSysCache(coltuple);
1297 : : }
1298 : :
738 tomas.vondra@postgre 1299 : 11 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1300 : : }
832 alvherre@alvh.no-ip. 1301 [ + + ]: 215 : else if (stmt->action == AP_DropObjects)
738 tomas.vondra@postgre 1302 : 19 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1303 : : else /* AP_SetObjects */
1304 : : {
1305 : 196 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
900 akapila@postgresql.o 1306 : 196 : List *delschemas = NIL;
1307 : :
1308 : : /* Identify which schemas should be dropped */
1309 : 196 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1310 : :
1311 : : /*
1312 : : * Schema lock is held until the publication is altered to prevent
1313 : : * concurrent schema deletion.
1314 : : */
1315 : 196 : LockSchemaList(delschemas);
1316 : :
1317 : : /* And drop them */
738 tomas.vondra@postgre 1318 : 196 : PublicationDropSchemas(pubform->oid, delschemas, true);
1319 : :
1320 : : /*
1321 : : * Don't bother calculating the difference for adding, we'll catch and
1322 : : * skip existing ones when doing catalog update.
1323 : : */
1324 : 196 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1325 : : }
1326 : : }
1327 : :
1328 : : /*
1329 : : * Check if relations and schemas can be in a given publication and throw
1330 : : * appropriate error if not.
1331 : : */
1332 : : static void
900 akapila@postgresql.o 1333 : 445 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1334 : : List *tables, List *schemaidlist)
1335 : : {
1336 : 445 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1337 : :
832 alvherre@alvh.no-ip. 1338 [ + + + + : 445 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
+ + ]
738 tomas.vondra@postgre 1339 [ + + ]: 47 : schemaidlist && !superuser())
900 akapila@postgresql.o 1340 [ + - ]: 3 : ereport(ERROR,
1341 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1342 : : errmsg("must be superuser to add or set schemas")));
1343 : :
1344 : : /*
1345 : : * Check that user is allowed to manipulate the publication tables in
1346 : : * schema
1347 : : */
738 tomas.vondra@postgre 1348 [ + + + + ]: 442 : if (schemaidlist && pubform->puballtables)
900 akapila@postgresql.o 1349 [ + - ]: 9 : ereport(ERROR,
1350 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1351 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1352 : : NameStr(pubform->pubname)),
1353 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1354 : :
1355 : : /* Check that user is allowed to manipulate the publication tables. */
1356 [ + + + + ]: 433 : if (tables && pubform->puballtables)
1357 [ + - ]: 9 : ereport(ERROR,
1358 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1359 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1360 : : NameStr(pubform->pubname)),
1361 : : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1362 : 424 : }
1363 : :
1364 : : /*
1365 : : * Alter the existing publication.
1366 : : *
1367 : : * This is dispatcher function for AlterPublicationOptions,
1368 : : * AlterPublicationSchemas and AlterPublicationTables.
1369 : : */
1370 : : void
1004 dean.a.rasheed@gmail 1371 : 509 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1372 : : {
1373 : : Relation rel;
1374 : : HeapTuple tup;
1375 : : Form_pg_publication pubform;
1376 : :
1910 andres@anarazel.de 1377 : 509 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1378 : :
2642 peter_e@gmx.net 1379 : 509 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1380 : : CStringGetDatum(stmt->pubname));
1381 : :
1382 [ - + ]: 509 : if (!HeapTupleIsValid(tup))
2642 peter_e@gmx.net 1383 [ # # ]:UBC 0 : ereport(ERROR,
1384 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1385 : : errmsg("publication \"%s\" does not exist",
1386 : : stmt->pubname)));
1387 : :
1972 andres@anarazel.de 1388 :CBC 509 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1389 : :
1390 : : /* must be owner */
518 peter@eisentraut.org 1391 [ - + ]: 509 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
2325 peter_e@gmx.net 1392 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2642 1393 : 0 : stmt->pubname);
1394 : :
2642 peter_e@gmx.net 1395 [ + + ]:CBC 509 : if (stmt->options)
1004 dean.a.rasheed@gmail 1396 : 55 : AlterPublicationOptions(pstate, stmt, rel, tup);
1397 : : else
1398 : : {
738 tomas.vondra@postgre 1399 : 454 : List *relations = NIL;
1400 : 454 : List *schemaidlist = NIL;
782 akapila@postgresql.o 1401 : 454 : Oid pubid = pubform->oid;
1402 : :
738 tomas.vondra@postgre 1403 : 454 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1404 : : &schemaidlist);
1405 : :
1406 : 445 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1407 : :
782 akapila@postgresql.o 1408 : 424 : heap_freetuple(tup);
1409 : :
1410 : : /* Lock the publication so nobody else can do anything with it. */
1411 : 424 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1412 : : AccessExclusiveLock);
1413 : :
1414 : : /*
1415 : : * It is possible that by the time we acquire the lock on publication,
1416 : : * concurrent DDL has removed it. We can test this by checking the
1417 : : * existence of publication. We get the tuple again to avoid the risk
1418 : : * of any publication option getting changed.
1419 : : */
1420 : 424 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1421 [ - + ]: 424 : if (!HeapTupleIsValid(tup))
900 akapila@postgresql.o 1422 [ # # ]:UBC 0 : ereport(ERROR,
1423 : : errcode(ERRCODE_UNDEFINED_OBJECT),
1424 : : errmsg("publication \"%s\" does not exist",
1425 : : stmt->pubname));
1426 : :
569 akapila@postgresql.o 1427 :CBC 424 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1428 : : schemaidlist != NIL);
738 tomas.vondra@postgre 1429 : 367 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1430 : : }
1431 : :
1432 : : /* Cleanup. */
2642 peter_e@gmx.net 1433 : 407 : heap_freetuple(tup);
1910 andres@anarazel.de 1434 : 407 : table_close(rel, RowExclusiveLock);
2642 peter_e@gmx.net 1435 : 407 : }
1436 : :
1437 : : /*
1438 : : * Remove relation from publication by mapping OID.
1439 : : */
1440 : : void
1441 : 380 : RemovePublicationRelById(Oid proid)
1442 : : {
1443 : : Relation rel;
1444 : : HeapTuple tup;
1445 : : Form_pg_publication_rel pubrel;
935 akapila@postgresql.o 1446 : 380 : List *relids = NIL;
1447 : :
1910 andres@anarazel.de 1448 : 380 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1449 : :
2642 peter_e@gmx.net 1450 : 380 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1451 : :
1452 [ - + ]: 380 : if (!HeapTupleIsValid(tup))
2642 peter_e@gmx.net 1453 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication table %u",
1454 : : proid);
1455 : :
2642 peter_e@gmx.net 1456 :CBC 380 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1457 : :
1458 : : /*
1459 : : * Invalidate relcache so that publication info is rebuilt.
1460 : : *
1461 : : * For the partitioned tables, we must invalidate all partitions contained
1462 : : * in the respective partition hierarchies, not just the one explicitly
1463 : : * mentioned in the publication. This is required because we implicitly
1464 : : * publish the child tables when the parent table is published.
1465 : : */
935 akapila@postgresql.o 1466 : 380 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1467 : : pubrel->prrelid);
1468 : :
1469 : 380 : InvalidatePublicationRels(relids);
1470 : :
2629 tgl@sss.pgh.pa.us 1471 : 380 : CatalogTupleDelete(rel, &tup->t_self);
1472 : :
2642 peter_e@gmx.net 1473 : 380 : ReleaseSysCache(tup);
1474 : :
1910 andres@anarazel.de 1475 : 380 : table_close(rel, RowExclusiveLock);
2642 peter_e@gmx.net 1476 : 380 : }
1477 : :
1478 : : /*
1479 : : * Remove the publication by mapping OID.
1480 : : */
1481 : : void
949 akapila@postgresql.o 1482 : 185 : RemovePublicationById(Oid pubid)
1483 : : {
1484 : : Relation rel;
1485 : : HeapTuple tup;
1486 : : Form_pg_publication pubform;
1487 : :
1488 : 185 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1489 : :
1490 : 185 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1491 [ - + ]: 185 : if (!HeapTupleIsValid(tup))
949 akapila@postgresql.o 1492 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1493 : :
836 alvherre@alvh.no-ip. 1494 :CBC 185 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1495 : :
1496 : : /* Invalidate relcache so that publication info is rebuilt. */
738 tomas.vondra@postgre 1497 [ + + ]: 185 : if (pubform->puballtables)
949 akapila@postgresql.o 1498 : 12 : CacheInvalidateRelcacheAll();
1499 : :
1500 : 185 : CatalogTupleDelete(rel, &tup->t_self);
1501 : :
1502 : 185 : ReleaseSysCache(tup);
1503 : :
1504 : 185 : table_close(rel, RowExclusiveLock);
1505 : 185 : }
1506 : :
1507 : : /*
1508 : : * Remove schema from publication by mapping OID.
1509 : : */
1510 : : void
900 1511 : 96 : RemovePublicationSchemaById(Oid psoid)
1512 : : {
1513 : : Relation rel;
1514 : : HeapTuple tup;
1515 : 96 : List *schemaRels = NIL;
1516 : : Form_pg_publication_namespace pubsch;
1517 : :
1518 : 96 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1519 : :
1520 : 96 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1521 : :
1522 [ - + ]: 96 : if (!HeapTupleIsValid(tup))
900 akapila@postgresql.o 1523 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1524 : :
900 akapila@postgresql.o 1525 :CBC 96 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1526 : :
1527 : : /*
1528 : : * Invalidate relcache so that publication info is rebuilt. See
1529 : : * RemovePublicationRelById for why we need to consider all the
1530 : : * partitions.
1531 : : */
1532 : 96 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1533 : : PUBLICATION_PART_ALL);
1534 : 96 : InvalidatePublicationRels(schemaRels);
1535 : :
1536 : 96 : CatalogTupleDelete(rel, &tup->t_self);
1537 : :
1538 : 96 : ReleaseSysCache(tup);
1539 : :
1540 : 96 : table_close(rel, RowExclusiveLock);
1541 : 96 : }
1542 : :
1543 : : /*
1544 : : * Open relations specified by a PublicationTable list.
1545 : : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1546 : : * add them to a publication.
1547 : : */
1548 : : static List *
738 tomas.vondra@postgre 1549 : 579 : OpenTableList(List *tables)
1550 : : {
2642 peter_e@gmx.net 1551 : 579 : List *relids = NIL;
738 tomas.vondra@postgre 1552 : 579 : List *rels = NIL;
1553 : : ListCell *lc;
782 akapila@postgresql.o 1554 : 579 : List *relids_with_rf = NIL;
750 tomas.vondra@postgre 1555 : 579 : List *relids_with_collist = NIL;
1556 : :
1557 : : /*
1558 : : * Open, share-lock, and check all the explicitly-specified relations
1559 : : */
738 1560 [ + + + + : 1188 : foreach(lc, tables)
+ + ]
1561 : : {
951 alvherre@alvh.no-ip. 1562 : 621 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1563 : 621 : bool recurse = t->relation->inh;
1564 : : Relation rel;
1565 : : Oid myrelid;
1566 : : PublicationRelInfo *pub_rel;
1567 : :
1568 : : /* Allow query cancel in case this takes a long time */
2642 peter_e@gmx.net 1569 [ - + ]: 621 : CHECK_FOR_INTERRUPTS();
1570 : :
951 alvherre@alvh.no-ip. 1571 : 621 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
2642 peter_e@gmx.net 1572 : 621 : myrelid = RelationGetRelid(rel);
1573 : :
1574 : : /*
1575 : : * Filter out duplicates if user specifies "foo, foo".
1576 : : *
1577 : : * Note that this algorithm is known to not be very efficient (O(N^2))
1578 : : * but given that it only works on list of tables given to us by user
1579 : : * it's deemed acceptable.
1580 : : */
1581 [ + + ]: 621 : if (list_member_oid(relids, myrelid))
1582 : : {
1583 : : /* Disallow duplicate tables if there are any with row filters. */
782 akapila@postgresql.o 1584 [ + + + + ]: 12 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1585 [ + - ]: 6 : ereport(ERROR,
1586 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1587 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1588 : : RelationGetRelationName(rel))));
1589 : :
1590 : : /* Disallow duplicate tables if there are any with column lists. */
750 tomas.vondra@postgre 1591 [ + + + - ]: 6 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1592 [ + - ]: 6 : ereport(ERROR,
1593 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1594 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1595 : : RelationGetRelationName(rel))));
1596 : :
1910 andres@anarazel.de 1597 :UBC 0 : table_close(rel, ShareUpdateExclusiveLock);
2642 peter_e@gmx.net 1598 : 0 : continue;
1599 : : }
1600 : :
951 alvherre@alvh.no-ip. 1601 :CBC 609 : pub_rel = palloc(sizeof(PublicationRelInfo));
1602 : 609 : pub_rel->relation = rel;
782 akapila@postgresql.o 1603 : 609 : pub_rel->whereClause = t->whereClause;
750 tomas.vondra@postgre 1604 : 609 : pub_rel->columns = t->columns;
738 1605 : 609 : rels = lappend(rels, pub_rel);
2642 peter_e@gmx.net 1606 : 609 : relids = lappend_oid(relids, myrelid);
1607 : :
782 akapila@postgresql.o 1608 [ + + ]: 609 : if (t->whereClause)
1609 : 186 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1610 : :
750 tomas.vondra@postgre 1611 [ + + ]: 609 : if (t->columns)
1612 : 166 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1613 : :
1614 : : /*
1615 : : * Add children of this rel, if requested, so that they too are added
1616 : : * to the publication. A partitioned table can't have any inheritance
1617 : : * children other than its partitions, which need not be explicitly
1618 : : * added to the publication.
1619 : : */
1496 peter@eisentraut.org 1620 [ + + + + ]: 609 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1621 : : {
1622 : : List *children;
1623 : : ListCell *child;
1624 : :
2642 peter_e@gmx.net 1625 : 518 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1626 : : NULL);
1627 : :
1628 [ + - + + : 1040 : foreach(child, children)
+ + ]
1629 : : {
1630 : 522 : Oid childrelid = lfirst_oid(child);
1631 : :
1632 : : /* Allow query cancel in case this takes a long time */
1955 tgl@sss.pgh.pa.us 1633 [ - + ]: 522 : CHECK_FOR_INTERRUPTS();
1634 : :
1635 : : /*
1636 : : * Skip duplicates if user specified both parent and child
1637 : : * tables.
1638 : : */
2642 peter_e@gmx.net 1639 [ + + ]: 522 : if (list_member_oid(relids, childrelid))
1640 : : {
1641 : : /*
1642 : : * We don't allow to specify row filter for both parent
1643 : : * and child table at the same time as it is not very
1644 : : * clear which one should be given preference.
1645 : : */
782 akapila@postgresql.o 1646 [ - + ]: 518 : if (childrelid != myrelid &&
782 akapila@postgresql.o 1647 [ # # # # ]:UBC 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1648 [ # # ]: 0 : ereport(ERROR,
1649 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1650 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1651 : : RelationGetRelationName(rel))));
1652 : :
1653 : : /*
1654 : : * We don't allow to specify column list for both parent
1655 : : * and child table at the same time as it is not very
1656 : : * clear which one should be given preference.
1657 : : */
750 tomas.vondra@postgre 1658 [ - + ]:CBC 518 : if (childrelid != myrelid &&
750 tomas.vondra@postgre 1659 [ # # # # ]:UBC 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1660 [ # # ]: 0 : ereport(ERROR,
1661 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1662 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1663 : : RelationGetRelationName(rel))));
1664 : :
2642 peter_e@gmx.net 1665 :CBC 518 : continue;
1666 : : }
1667 : :
1668 : : /* find_all_inheritors already got lock */
1910 andres@anarazel.de 1669 : 4 : rel = table_open(childrelid, NoLock);
951 alvherre@alvh.no-ip. 1670 : 4 : pub_rel = palloc(sizeof(PublicationRelInfo));
1671 : 4 : pub_rel->relation = rel;
1672 : : /* child inherits WHERE clause from parent */
782 akapila@postgresql.o 1673 : 4 : pub_rel->whereClause = t->whereClause;
1674 : :
1675 : : /* child inherits column list from parent */
750 tomas.vondra@postgre 1676 : 4 : pub_rel->columns = t->columns;
738 1677 : 4 : rels = lappend(rels, pub_rel);
2642 peter_e@gmx.net 1678 : 4 : relids = lappend_oid(relids, childrelid);
1679 : :
782 akapila@postgresql.o 1680 [ + + ]: 4 : if (t->whereClause)
1681 : 1 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1682 : :
750 tomas.vondra@postgre 1683 [ - + ]: 4 : if (t->columns)
750 tomas.vondra@postgre 1684 :UBC 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1685 : : }
1686 : : }
1687 : : }
1688 : :
2642 peter_e@gmx.net 1689 :CBC 567 : list_free(relids);
782 akapila@postgresql.o 1690 : 567 : list_free(relids_with_rf);
1691 : :
738 tomas.vondra@postgre 1692 : 567 : return rels;
1693 : : }
1694 : :
1695 : : /*
1696 : : * Close all relations in the list.
1697 : : */
1698 : : static void
1699 : 678 : CloseTableList(List *rels)
1700 : : {
1701 : : ListCell *lc;
1702 : :
2642 peter_e@gmx.net 1703 [ + + + + : 1375 : foreach(lc, rels)
+ + ]
1704 : : {
1705 : : PublicationRelInfo *pub_rel;
1706 : :
951 alvherre@alvh.no-ip. 1707 : 697 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1708 : 697 : table_close(pub_rel->relation, NoLock);
1709 : : }
1710 : :
782 akapila@postgresql.o 1711 : 678 : list_free_deep(rels);
2642 peter_e@gmx.net 1712 : 678 : }
1713 : :
1714 : : /*
1715 : : * Lock the schemas specified in the schema list in AccessShareLock mode in
1716 : : * order to prevent concurrent schema deletion.
1717 : : */
1718 : : static void
900 akapila@postgresql.o 1719 : 492 : LockSchemaList(List *schemalist)
1720 : : {
1721 : : ListCell *lc;
1722 : :
1723 [ + + + + : 634 : foreach(lc, schemalist)
+ + ]
1724 : : {
1725 : 142 : Oid schemaid = lfirst_oid(lc);
1726 : :
1727 : : /* Allow query cancel in case this takes a long time */
1728 [ - + ]: 142 : CHECK_FOR_INTERRUPTS();
1729 : 142 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1730 : :
1731 : : /*
1732 : : * It is possible that by the time we acquire the lock on schema,
1733 : : * concurrent DDL has removed it. We can test this by checking the
1734 : : * existence of schema.
1735 : : */
1736 [ - + ]: 142 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
900 akapila@postgresql.o 1737 [ # # ]:UBC 0 : ereport(ERROR,
1738 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
1739 : : errmsg("schema with OID %u does not exist", schemaid));
1740 : : }
900 akapila@postgresql.o 1741 :CBC 492 : }
1742 : :
1743 : : /*
1744 : : * Add listed tables to the publication.
1745 : : */
1746 : : static void
738 tomas.vondra@postgre 1747 : 476 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1748 : : AlterPublicationStmt *stmt)
1749 : : {
1750 : : ListCell *lc;
1751 : :
1752 [ + + - + ]: 476 : Assert(!stmt || !stmt->for_all_tables);
1753 : :
2642 peter_e@gmx.net 1754 [ + + + + : 952 : foreach(lc, rels)
+ + ]
1755 : : {
951 alvherre@alvh.no-ip. 1756 : 507 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1757 : 507 : Relation rel = pub_rel->relation;
1758 : : ObjectAddress obj;
1759 : :
1760 : : /* Must be owner of the table or superuser. */
518 peter@eisentraut.org 1761 [ + + ]: 507 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
2325 peter_e@gmx.net 1762 : 3 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
2642 1763 : 3 : RelationGetRelationName(rel));
1764 : :
951 alvherre@alvh.no-ip. 1765 : 504 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
2642 peter_e@gmx.net 1766 [ + + ]: 476 : if (stmt)
1767 : : {
1768 : 297 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1769 : : (Node *) stmt);
1770 : :
1771 [ - + ]: 297 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1772 : : obj.objectId, 0);
1773 : : }
1774 : : }
1775 : 445 : }
1776 : :
1777 : : /*
1778 : : * Remove listed tables from the publication.
1779 : : */
1780 : : static void
738 tomas.vondra@postgre 1781 : 242 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1782 : : {
1783 : : ObjectAddress obj;
1784 : : ListCell *lc;
1785 : : Oid prid;
1786 : :
2642 peter_e@gmx.net 1787 [ + + + + : 463 : foreach(lc, rels)
+ + ]
1788 : : {
951 alvherre@alvh.no-ip. 1789 : 230 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1790 : 230 : Relation rel = pubrel->relation;
2642 peter_e@gmx.net 1791 : 230 : Oid relid = RelationGetRelid(rel);
1792 : :
750 tomas.vondra@postgre 1793 [ - + ]: 230 : if (pubrel->columns)
750 tomas.vondra@postgre 1794 [ # # ]:UBC 0 : ereport(ERROR,
1795 : : errcode(ERRCODE_SYNTAX_ERROR),
1796 : : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1797 : :
1972 andres@anarazel.de 1798 :CBC 230 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1799 : : ObjectIdGetDatum(relid),
1800 : : ObjectIdGetDatum(pubid));
2642 peter_e@gmx.net 1801 [ + + ]: 230 : if (!OidIsValid(prid))
1802 : : {
1803 [ - + ]: 6 : if (missing_ok)
2642 peter_e@gmx.net 1804 :UBC 0 : continue;
1805 : :
2642 peter_e@gmx.net 1806 [ + - ]:CBC 6 : ereport(ERROR,
1807 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1808 : : errmsg("relation \"%s\" is not part of the publication",
1809 : : RelationGetRelationName(rel))));
1810 : : }
1811 : :
782 akapila@postgresql.o 1812 [ + + ]: 224 : if (pubrel->whereClause)
1813 [ + - ]: 3 : ereport(ERROR,
1814 : : (errcode(ERRCODE_SYNTAX_ERROR),
1815 : : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1816 : :
2642 peter_e@gmx.net 1817 : 221 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1818 : 221 : performDeletion(&obj, DROP_CASCADE, 0);
1819 : : }
1820 : 233 : }
1821 : :
1822 : : /*
1823 : : * Add listed schemas to the publication.
1824 : : */
1825 : : static void
738 tomas.vondra@postgre 1826 : 274 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1827 : : AlterPublicationStmt *stmt)
1828 : : {
1829 : : ListCell *lc;
1830 : :
1831 [ + + - + ]: 274 : Assert(!stmt || !stmt->for_all_tables);
1832 : :
900 akapila@postgresql.o 1833 [ + + + + : 379 : foreach(lc, schemas)
+ + ]
1834 : : {
1835 : 111 : Oid schemaid = lfirst_oid(lc);
1836 : : ObjectAddress obj;
1837 : :
738 tomas.vondra@postgre 1838 : 111 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
900 akapila@postgresql.o 1839 [ + + ]: 105 : if (stmt)
1840 : : {
1841 : 29 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1842 : : (Node *) stmt);
1843 : :
1844 [ - + ]: 29 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1845 : : obj.objectId, 0);
1846 : : }
1847 : : }
1848 : 268 : }
1849 : :
1850 : : /*
1851 : : * Remove listed schemas from the publication.
1852 : : */
1853 : : static void
738 tomas.vondra@postgre 1854 : 215 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1855 : : {
1856 : : ObjectAddress obj;
1857 : : ListCell *lc;
1858 : : Oid psid;
1859 : :
900 akapila@postgresql.o 1860 [ + + + + : 240 : foreach(lc, schemas)
+ + ]
1861 : : {
1862 : 28 : Oid schemaid = lfirst_oid(lc);
1863 : :
738 tomas.vondra@postgre 1864 : 28 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1865 : : Anum_pg_publication_namespace_oid,
1866 : : ObjectIdGetDatum(schemaid),
1867 : : ObjectIdGetDatum(pubid));
900 akapila@postgresql.o 1868 [ + + ]: 28 : if (!OidIsValid(psid))
1869 : : {
1870 [ - + ]: 3 : if (missing_ok)
900 akapila@postgresql.o 1871 :UBC 0 : continue;
1872 : :
900 akapila@postgresql.o 1873 [ + - ]:CBC 3 : ereport(ERROR,
1874 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1875 : : errmsg("tables from schema \"%s\" are not part of the publication",
1876 : : get_namespace_name(schemaid))));
1877 : : }
1878 : :
1879 : 25 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1880 : 25 : performDeletion(&obj, DROP_CASCADE, 0);
1881 : : }
1882 : 212 : }
1883 : :
1884 : : /*
1885 : : * Internal workhorse for changing a publication owner
1886 : : */
1887 : : static void
2642 peter_e@gmx.net 1888 : 13 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1889 : : {
1890 : : Form_pg_publication form;
1891 : :
1892 : 13 : form = (Form_pg_publication) GETSTRUCT(tup);
1893 : :
1894 [ + + ]: 13 : if (form->pubowner == newOwnerId)
2642 peter_e@gmx.net 1895 :GBC 1 : return;
1896 : :
2617 peter_e@gmx.net 1897 [ + + ]:CBC 12 : if (!superuser())
1898 : : {
1899 : : AclResult aclresult;
1900 : :
1901 : : /* Must be owner */
518 peter@eisentraut.org 1902 [ - + ]: 6 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2325 peter_e@gmx.net 1903 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2617 1904 : 0 : NameStr(form->pubname));
1905 : :
1906 : : /* Must be able to become new owner */
513 rhaas@postgresql.org 1907 :CBC 6 : check_can_set_role(GetUserId(), newOwnerId);
1908 : :
1909 : : /* New owner must have CREATE privilege on database */
518 peter@eisentraut.org 1910 : 6 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2617 peter_e@gmx.net 1911 [ - + ]: 6 : if (aclresult != ACLCHECK_OK)
2325 peter_e@gmx.net 1912 :UBC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
2617 1913 : 0 : get_database_name(MyDatabaseId));
1914 : :
2617 peter_e@gmx.net 1915 [ - + - - ]:CBC 6 : if (form->puballtables && !superuser_arg(newOwnerId))
2617 peter_e@gmx.net 1916 [ # # ]:UBC 0 : ereport(ERROR,
1917 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1918 : : errmsg("permission denied to change owner of publication \"%s\"",
1919 : : NameStr(form->pubname)),
1920 : : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1921 : :
858 akapila@postgresql.o 1922 [ + + + - ]:CBC 6 : if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1923 [ + - ]: 3 : ereport(ERROR,
1924 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925 : : errmsg("permission denied to change owner of publication \"%s\"",
1926 : : NameStr(form->pubname)),
1927 : : errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1928 : : }
1929 : :
2642 peter_e@gmx.net 1930 : 9 : form->pubowner = newOwnerId;
2630 alvherre@alvh.no-ip. 1931 : 9 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1932 : :
1933 : : /* Update owner dependency reference */
2642 peter_e@gmx.net 1934 : 9 : changeDependencyOnOwner(PublicationRelationId,
1935 : : form->oid,
1936 : : newOwnerId);
1937 : :
1938 [ - + ]: 9 : InvokeObjectPostAlterHook(PublicationRelationId,
1939 : : form->oid, 0);
1940 : : }
1941 : :
1942 : : /*
1943 : : * Change publication owner -- by name
1944 : : */
1945 : : ObjectAddress
1946 : 13 : AlterPublicationOwner(const char *name, Oid newOwnerId)
1947 : : {
1948 : : Oid subid;
1949 : : HeapTuple tup;
1950 : : Relation rel;
1951 : : ObjectAddress address;
1952 : : Form_pg_publication pubform;
1953 : :
1910 andres@anarazel.de 1954 : 13 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1955 : :
2642 peter_e@gmx.net 1956 : 13 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
1957 : :
1958 [ - + ]: 13 : if (!HeapTupleIsValid(tup))
2642 peter_e@gmx.net 1959 [ # # ]:UBC 0 : ereport(ERROR,
1960 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1961 : : errmsg("publication \"%s\" does not exist", name)));
1962 : :
1972 andres@anarazel.de 1963 :CBC 13 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1964 : 13 : subid = pubform->oid;
1965 : :
2642 peter_e@gmx.net 1966 : 13 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
1967 : :
1968 : 10 : ObjectAddressSet(address, PublicationRelationId, subid);
1969 : :
1970 : 10 : heap_freetuple(tup);
1971 : :
1910 andres@anarazel.de 1972 : 10 : table_close(rel, RowExclusiveLock);
1973 : :
2642 peter_e@gmx.net 1974 : 10 : return address;
1975 : : }
1976 : :
1977 : : /*
1978 : : * Change publication owner -- by OID
1979 : : */
1980 : : void
2642 peter_e@gmx.net 1981 :UBC 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
1982 : : {
1983 : : HeapTuple tup;
1984 : : Relation rel;
1985 : :
1910 andres@anarazel.de 1986 : 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1987 : :
2642 peter_e@gmx.net 1988 : 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
1989 : :
1990 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
1991 [ # # ]: 0 : ereport(ERROR,
1992 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1993 : : errmsg("publication with OID %u does not exist", subid)));
1994 : :
1995 : 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
1996 : :
1997 : 0 : heap_freetuple(tup);
1998 : :
1910 andres@anarazel.de 1999 : 0 : table_close(rel, RowExclusiveLock);
2642 peter_e@gmx.net 2000 : 0 : }
|