Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "access/xact.h"
22 : #include "catalog/catalog.h"
23 : #include "catalog/dependency.h"
24 : #include "catalog/index.h"
25 : #include "catalog/indexing.h"
26 : #include "catalog/namespace.h"
27 : #include "catalog/partition.h"
28 : #include "catalog/objectaccess.h"
29 : #include "catalog/objectaddress.h"
30 : #include "catalog/pg_inherits.h"
31 : #include "catalog/pg_namespace.h"
32 : #include "catalog/pg_publication.h"
33 : #include "catalog/pg_publication_namespace.h"
34 : #include "catalog/pg_publication_rel.h"
35 : #include "catalog/pg_type.h"
36 : #include "commands/publicationcmds.h"
37 : #include "funcapi.h"
38 : #include "miscadmin.h"
39 : #include "utils/array.h"
40 : #include "utils/builtins.h"
41 : #include "utils/catcache.h"
42 : #include "utils/fmgroids.h"
43 : #include "utils/inval.h"
44 : #include "utils/lsyscache.h"
45 : #include "utils/rel.h"
46 : #include "utils/syscache.h"
47 :
48 : /* Records association between publication and published table */
49 : typedef struct
50 : {
51 : Oid relid; /* OID of published table */
52 : Oid pubid; /* OID of publication that publishes this
53 : * table. */
54 : } published_rel;
55 :
56 : static void publication_translate_columns(Relation targetrel, List *columns,
57 : int *natts, AttrNumber **attrs);
58 :
59 : /*
60 : * Check if relation can be in given publication and throws appropriate
61 : * error if not.
62 : */
63 : static void
2271 peter_e 64 GIC 482 : check_publication_add_relation(Relation targetrel)
65 : {
66 : /* Must be a regular or partitioned table */
1125 peter 67 482 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
367 tomas.vondra 68 71 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
2271 peter_e 69 7 : ereport(ERROR,
70 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71 : errmsg("cannot add relation \"%s\" to publication",
2271 peter_e 72 ECB : RelationGetRelationName(targetrel)),
73 : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
74 :
75 : /* Can't be system table */
2271 peter_e 76 CBC 475 : if (IsCatalogRelation(targetrel))
77 3 : ereport(ERROR,
78 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79 : errmsg("cannot add relation \"%s\" to publication",
80 : RelationGetRelationName(targetrel)),
81 : errdetail("This operation is not supported for system tables.")));
82 :
83 : /* UNLOGGED and TEMP relations cannot be part of publication. */
508 dgustafsson 84 472 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
2271 peter_e 85 3 : ereport(ERROR,
86 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 : errmsg("cannot add relation \"%s\" to publication",
88 : RelationGetRelationName(targetrel)),
89 : errdetail("This operation is not supported for temporary tables.")));
508 dgustafsson 90 GIC 469 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
91 3 : ereport(ERROR,
508 dgustafsson 92 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
93 : errmsg("cannot add relation \"%s\" to publication",
94 : RelationGetRelationName(targetrel)),
95 : errdetail("This operation is not supported for unlogged tables.")));
2271 peter_e 96 GIC 466 : }
97 :
529 akapila 98 ECB : /*
99 : * Check if schema can be in given publication and throw appropriate error if
100 : * not.
101 : */
102 : static void
529 akapila 103 GIC 102 : check_publication_add_schema(Oid schemaid)
529 akapila 104 ECB : {
105 : /* Can't be system namespace */
529 akapila 106 GIC 102 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
107 3 : ereport(ERROR,
108 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109 : errmsg("cannot add schema \"%s\" to publication",
110 : get_namespace_name(schemaid)),
529 akapila 111 ECB : errdetail("This operation is not supported for system schemas.")));
112 :
113 : /* Can't be temporary namespace */
529 akapila 114 CBC 99 : if (isAnyTempNamespace(schemaid))
529 akapila 115 LBC 0 : ereport(ERROR,
116 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 : errmsg("cannot add schema \"%s\" to publication",
118 : get_namespace_name(schemaid)),
119 : errdetail("Temporary schemas cannot be replicated.")));
529 akapila 120 GIC 99 : }
121 :
2271 peter_e 122 ECB : /*
2271 peter_e 123 EUB : * Returns if relation represented by oid and Form_pg_class entry
124 : * is publishable.
125 : *
126 : * Does same checks as check_publication_add_relation() above, but does not
127 : * need relation to be opened and also does not throw errors.
2154 tgl 128 ECB : *
129 : * XXX This also excludes all tables with relid < FirstNormalObjectId,
130 : * ie all tables created during initdb. This mainly affects the preinstalled
131 : * information_schema. IsCatalogRelationOid() only excludes tables with
132 : * relid < FirstUnpinnedObjectId, making that test rather redundant,
133 : * but really we should get rid of the FirstNormalObjectId test not
134 : * IsCatalogRelationOid. We can't do so today because we don't want
135 : * information_schema tables to be considered publishable; but this test
136 : * is really inadequate for that, since the information_schema could be
137 : * dropped and reloaded and then it'll be considered publishable. The best
138 : * long-term solution may be to add a "relispublishable" bool to pg_class,
139 : * and depend on that instead of OID checks.
140 : */
141 : static bool
2271 peter_e 142 GIC 292849 : is_publishable_class(Oid relid, Form_pg_class reltuple)
143 : {
1125 peter 144 298291 : return (reltuple->relkind == RELKIND_RELATION ||
367 tomas.vondra 145 5442 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
1432 tgl 146 287979 : !IsCatalogRelationOid(relid) &&
2271 peter_e 147 585698 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
148 : relid >= FirstNormalObjectId;
149 : }
2271 peter_e 150 ECB :
151 : /*
152 : * Another variant of is_publishable_class(), taking a Relation.
254 akapila 153 : */
154 : bool
254 akapila 155 GNC 269696 : is_publishable_relation(Relation rel)
156 : {
157 269696 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
158 : }
159 :
160 : /*
161 : * SQL-callable variant of the above
162 : *
163 : * This returns null when the relation does not exist. This is intended to be
164 : * used for example in psql to avoid gratuitous errors when there are
165 : * concurrent catalog changes.
166 : */
167 : Datum
250 168 2246 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
169 : {
170 2246 : Oid relid = PG_GETARG_OID(0);
171 : HeapTuple tuple;
172 : bool result;
173 :
174 2246 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
175 2246 : if (!HeapTupleIsValid(tuple))
250 akapila 176 UNC 0 : PG_RETURN_NULL();
250 akapila 177 GNC 2246 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
178 2246 : ReleaseSysCache(tuple);
179 2246 : PG_RETURN_BOOL(result);
180 : }
181 :
182 : /*
183 : * Returns true if the ancestor is in the list of published relations.
184 : * Otherwise, returns false.
185 : */
186 : static bool
11 187 99 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
188 : {
189 : ListCell *lc;
190 :
191 340 : foreach(lc, table_infos)
192 : {
193 281 : Oid relid = ((published_rel *) lfirst(lc))->relid;
194 :
195 281 : if (relid == ancestor)
196 40 : return true;
197 : }
198 :
199 59 : return false;
200 : }
201 :
202 : /*
203 : * Filter out the partitions whose parent tables are also present in the list.
204 : */
205 : static void
206 160 : filter_partitions(List *table_infos)
207 : {
208 : ListCell *lc;
209 :
210 478 : foreach(lc, table_infos)
529 akapila 211 ECB : {
529 akapila 212 GIC 318 : bool skip = false;
529 akapila 213 CBC 318 : List *ancestors = NIL;
214 : ListCell *lc2;
11 akapila 215 GNC 318 : published_rel *table_info = (published_rel *) lfirst(lc);
216 :
217 318 : if (get_rel_relispartition(table_info->relid))
218 99 : ancestors = get_partition_ancestors(table_info->relid);
219 :
529 akapila 220 GIC 377 : foreach(lc2, ancestors)
221 : {
222 99 : Oid ancestor = lfirst_oid(lc2);
223 :
11 akapila 224 GNC 99 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
225 : {
529 akapila 226 CBC 40 : skip = true;
529 akapila 227 GIC 40 : break;
228 : }
229 : }
529 akapila 230 ECB :
11 akapila 231 GNC 318 : if (skip)
232 40 : table_infos = foreach_delete_current(table_infos, lc);
529 akapila 233 ECB : }
529 akapila 234 GIC 160 : }
235 :
487 akapila 236 ECB : /*
237 : * Returns true if any schema is associated with the publication, false if no
238 : * schema is associated with the publication.
239 : */
240 : bool
487 akapila 241 CBC 125 : is_schema_publication(Oid pubid)
242 : {
243 : Relation pubschsrel;
487 akapila 244 ECB : ScanKeyData scankey;
245 : SysScanDesc scan;
246 : HeapTuple tup;
487 akapila 247 GIC 125 : bool result = false;
248 :
249 125 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
250 125 : ScanKeyInit(&scankey,
487 akapila 251 ECB : Anum_pg_publication_namespace_pnpubid,
252 : BTEqualStrategyNumber, F_OIDEQ,
253 : ObjectIdGetDatum(pubid));
254 :
487 akapila 255 CBC 125 : scan = systable_beginscan(pubschsrel,
256 : PublicationNamespacePnnspidPnpubidIndexId,
487 akapila 257 ECB : true, NULL, 1, &scankey);
487 akapila 258 CBC 125 : tup = systable_getnext(scan);
487 akapila 259 GIC 125 : result = HeapTupleIsValid(tup);
487 akapila 260 ECB :
487 akapila 261 GIC 125 : systable_endscan(scan);
487 akapila 262 CBC 125 : table_close(pubschsrel, AccessShareLock);
487 akapila 263 ECB :
487 akapila 264 GIC 125 : return result;
487 akapila 265 ECB : }
266 :
267 : /*
268 : * Gets the relations based on the publication partition option for a specified
269 : * relation.
564 270 : */
271 : List *
564 akapila 272 CBC 2465 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
564 akapila 273 ECB : Oid relid)
274 : {
564 akapila 275 GIC 2465 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
276 : pub_partopt != PUBLICATION_PART_ROOT)
277 467 : {
564 akapila 278 CBC 467 : List *all_parts = find_all_inheritors(relid, NoLock,
279 : NULL);
280 :
281 467 : if (pub_partopt == PUBLICATION_PART_ALL)
282 370 : result = list_concat(result, all_parts);
564 akapila 283 GIC 97 : else if (pub_partopt == PUBLICATION_PART_LEAF)
564 akapila 284 ECB : {
285 : ListCell *lc;
286 :
564 akapila 287 CBC 355 : foreach(lc, all_parts)
288 : {
564 akapila 289 GIC 258 : Oid partOid = lfirst_oid(lc);
290 :
291 258 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
292 161 : result = lappend_oid(result, partOid);
293 : }
294 : }
564 akapila 295 ECB : else
564 akapila 296 UIC 0 : Assert(false);
297 : }
564 akapila 298 ECB : else
564 akapila 299 GIC 1998 : result = lappend_oid(result, relid);
564 akapila 300 ECB :
564 akapila 301 CBC 2465 : return result;
302 : }
303 :
411 akapila 304 ECB : /*
305 : * Returns the relid of the topmost ancestor that is published via this
389 tomas.vondra 306 : * publication if any and set its ancestor level to ancestor_level,
307 : * otherwise returns InvalidOid.
308 : *
309 : * The ancestor_level value allows us to compare the results for multiple
310 : * publications, and decide which value is higher up.
311 : *
411 akapila 312 : * Note that the list of ancestors should be ordered such that the topmost
313 : * ancestor is at the end of the list.
314 : */
315 : Oid
389 tomas.vondra 316 GIC 216 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
317 : {
318 : ListCell *lc;
411 akapila 319 GBC 216 : Oid topmost_relid = InvalidOid;
389 tomas.vondra 320 GIC 216 : int level = 0;
321 :
411 akapila 322 ECB : /*
323 : * Find the "topmost" ancestor that is in this publication.
324 : */
411 akapila 325 GIC 439 : foreach(lc, ancestors)
326 : {
327 223 : Oid ancestor = lfirst_oid(lc);
328 223 : List *apubids = GetRelationPublications(ancestor);
329 223 : List *aschemaPubids = NIL;
330 :
389 tomas.vondra 331 223 : level++;
332 :
411 akapila 333 223 : if (list_member_oid(apubids, puboid))
334 : {
335 146 : topmost_relid = ancestor;
336 :
389 tomas.vondra 337 146 : if (ancestor_level)
338 35 : *ancestor_level = level;
389 tomas.vondra 339 ECB : }
340 : else
341 : {
367 tomas.vondra 342 CBC 77 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
411 akapila 343 77 : if (list_member_oid(aschemaPubids, puboid))
344 : {
411 akapila 345 GIC 5 : topmost_relid = ancestor;
346 :
389 tomas.vondra 347 5 : if (ancestor_level)
389 tomas.vondra 348 CBC 5 : *ancestor_level = level;
349 : }
411 akapila 350 ECB : }
351 :
411 akapila 352 CBC 223 : list_free(apubids);
411 akapila 353 GIC 223 : list_free(aschemaPubids);
411 akapila 354 ECB : }
355 :
411 akapila 356 CBC 216 : return topmost_relid;
357 : }
411 akapila 358 ECB :
359 : /*
2271 peter_e 360 : * Insert new publication / relation mapping.
361 : */
362 : ObjectAddress
411 akapila 363 GIC 493 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
364 : bool if_not_exists)
2271 peter_e 365 ECB : {
366 : Relation rel;
367 : HeapTuple tup;
368 : Datum values[Natts_pg_publication_rel];
369 : bool nulls[Natts_pg_publication_rel];
411 akapila 370 CBC 493 : Relation targetrel = pri->relation;
371 493 : Oid relid = RelationGetRelid(targetrel);
372 : Oid pubreloid;
2271 peter_e 373 GIC 493 : Publication *pub = GetPublication(pubid);
379 tomas.vondra 374 493 : AttrNumber *attarray = NULL;
379 tomas.vondra 375 CBC 493 : int natts = 0;
2153 bruce 376 ECB : ObjectAddress myself,
377 : referenced;
564 akapila 378 GIC 493 : List *relids = NIL;
2271 peter_e 379 ECB :
1539 andres 380 GIC 493 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
381 :
382 : /*
383 : * Check for duplicates. Note that this does not really prevent
384 : * duplicates, it's here just to provide nicer error message in common
385 : * case. The real protection is the unique key on the catalog.
2271 peter_e 386 ECB : */
2271 peter_e 387 GIC 493 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
388 : ObjectIdGetDatum(pubid)))
389 : {
1539 andres 390 11 : table_close(rel, RowExclusiveLock);
391 :
2271 peter_e 392 11 : if (if_not_exists)
2271 peter_e 393 CBC 8 : return InvalidObjectAddress;
2271 peter_e 394 ECB :
2271 peter_e 395 GIC 3 : ereport(ERROR,
2271 peter_e 396 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
2118 tgl 397 : errmsg("relation \"%s\" is already member of publication \"%s\"",
411 akapila 398 : RelationGetRelationName(targetrel), pub->name)));
399 : }
400 :
411 akapila 401 CBC 482 : check_publication_add_relation(targetrel);
402 :
379 tomas.vondra 403 ECB : /*
404 : * Translate column names to attnums and make sure the column list
405 : * contains only allowed elements (no system or generated columns etc.).
406 : * Also build an array of attnums, for storing in the catalog.
407 : */
379 tomas.vondra 408 GIC 466 : publication_translate_columns(pri->relation, pri->columns,
409 : &natts, &attarray);
379 tomas.vondra 410 ECB :
411 : /* Form a tuple. */
2271 peter_e 412 GIC 457 : memset(values, 0, sizeof(values));
2271 peter_e 413 CBC 457 : memset(nulls, false, sizeof(nulls));
414 :
465 alvherre 415 457 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
465 alvherre 416 ECB : Anum_pg_publication_rel_oid);
465 alvherre 417 GIC 457 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
2271 peter_e 418 CBC 457 : values[Anum_pg_publication_rel_prpubid - 1] =
2271 peter_e 419 GIC 457 : ObjectIdGetDatum(pubid);
420 457 : values[Anum_pg_publication_rel_prrelid - 1] =
421 457 : ObjectIdGetDatum(relid);
422 :
423 : /* Add qualifications, if available */
411 akapila 424 CBC 457 : if (pri->whereClause != NULL)
411 akapila 425 GIC 149 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
426 : else
427 308 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
428 :
429 : /* Add column list, if available */
379 tomas.vondra 430 457 : if (pri->columns)
379 tomas.vondra 431 CBC 136 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
432 : else
379 tomas.vondra 433 GIC 321 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
434 :
2271 peter_e 435 CBC 457 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
2271 peter_e 436 ECB :
437 : /* Insert tuple into catalog. */
1601 andres 438 CBC 457 : CatalogTupleInsert(rel, tup);
2271 peter_e 439 GIC 457 : heap_freetuple(tup);
2271 peter_e 440 ECB :
465 alvherre 441 : /* Register dependencies as needed */
465 alvherre 442 CBC 457 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
2271 peter_e 443 ECB :
444 : /* Add dependency on the publication */
2271 peter_e 445 GIC 457 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
446 457 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
2271 peter_e 447 ECB :
448 : /* Add dependency on the relation */
2271 peter_e 449 GIC 457 : ObjectAddressSet(referenced, RelationRelationId, relid);
2271 peter_e 450 CBC 457 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
451 :
452 : /* Add dependency on the objects mentioned in the qualifications */
411 akapila 453 457 : if (pri->whereClause)
454 149 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
455 : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
411 akapila 456 ECB : false);
457 :
379 tomas.vondra 458 : /* Add dependency on the columns, if any are listed */
379 tomas.vondra 459 GIC 695 : for (int i = 0; i < natts; i++)
460 : {
379 tomas.vondra 461 CBC 238 : ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
462 238 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
463 : }
464 :
2271 peter_e 465 ECB : /* Close the table. */
1539 andres 466 GIC 457 : table_close(rel, RowExclusiveLock);
467 :
564 akapila 468 ECB : /*
469 : * Invalidate relcache so that publication info is rebuilt.
470 : *
471 : * For the partitioned tables, we must invalidate all partitions contained
472 : * in the respective partition hierarchies, not just the one explicitly
473 : * mentioned in the publication. This is required because we implicitly
474 : * publish the child tables when the parent table is published.
475 : */
564 akapila 476 CBC 457 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
564 akapila 477 ECB : relid);
478 :
564 akapila 479 GIC 457 : InvalidatePublicationRels(relids);
480 :
2271 peter_e 481 457 : return myself;
2271 peter_e 482 ECB : }
483 :
379 tomas.vondra 484 : /* qsort comparator for attnums */
485 : static int
379 tomas.vondra 486 GIC 102 : compare_int16(const void *a, const void *b)
487 : {
488 102 : int av = *(const int16 *) a;
379 tomas.vondra 489 CBC 102 : int bv = *(const int16 *) b;
490 :
491 : /* this can't overflow if int is wider than int16 */
379 tomas.vondra 492 GIC 102 : return (av - bv);
493 : }
494 :
495 : /*
496 : * Translate a list of column names to an array of attribute numbers
497 : * and a Bitmapset with them; verify that each attribute is appropriate
498 : * to have in a publication column list (no system or generated attributes,
379 tomas.vondra 499 ECB : * no duplicates). Additional checks with replica identity are done later;
500 : * see check_publication_columns.
501 : *
502 : * Note that the attribute numbers are *not* offset by
503 : * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
504 : * is okay.
505 : */
506 : static void
379 tomas.vondra 507 GIC 466 : publication_translate_columns(Relation targetrel, List *columns,
508 : int *natts, AttrNumber **attrs)
379 tomas.vondra 509 ECB : {
379 tomas.vondra 510 GIC 466 : AttrNumber *attarray = NULL;
379 tomas.vondra 511 CBC 466 : Bitmapset *set = NULL;
379 tomas.vondra 512 ECB : ListCell *lc;
379 tomas.vondra 513 GIC 466 : int n = 0;
514 466 : TupleDesc tupdesc = RelationGetDescr(targetrel);
379 tomas.vondra 515 ECB :
516 : /* Bail out when no column list defined. */
379 tomas.vondra 517 GIC 466 : if (!columns)
518 321 : return;
519 :
520 : /*
521 : * Translate list of columns to attnums. We prohibit system attributes and
522 : * make sure there are no duplicate columns.
523 : */
524 145 : attarray = palloc(sizeof(AttrNumber) * list_length(columns));
525 392 : foreach(lc, columns)
526 : {
527 256 : char *colname = strVal(lfirst(lc));
528 256 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
529 :
379 tomas.vondra 530 CBC 256 : if (attnum == InvalidAttrNumber)
379 tomas.vondra 531 GIC 3 : ereport(ERROR,
532 : errcode(ERRCODE_UNDEFINED_COLUMN),
379 tomas.vondra 533 ECB : errmsg("column \"%s\" of relation \"%s\" does not exist",
534 : colname, RelationGetRelationName(targetrel)));
535 :
379 tomas.vondra 536 CBC 253 : if (!AttrNumberIsForUserDefinedAttr(attnum))
537 3 : ereport(ERROR,
538 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
539 : errmsg("cannot use system column \"%s\" in publication column list",
379 tomas.vondra 540 ECB : colname));
541 :
379 tomas.vondra 542 GIC 250 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
543 3 : ereport(ERROR,
544 : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
545 : errmsg("cannot use generated column \"%s\" in publication column list",
546 : colname));
379 tomas.vondra 547 ECB :
379 tomas.vondra 548 CBC 247 : if (bms_is_member(attnum, set))
379 tomas.vondra 549 UIC 0 : ereport(ERROR,
379 tomas.vondra 550 ECB : errcode(ERRCODE_DUPLICATE_OBJECT),
551 : errmsg("duplicate column \"%s\" in publication column list",
552 : colname));
553 :
379 tomas.vondra 554 CBC 247 : set = bms_add_member(set, attnum);
379 tomas.vondra 555 GIC 247 : attarray[n++] = attnum;
556 : }
557 :
558 : /* Be tidy, so that the catalog representation is always sorted */
379 tomas.vondra 559 CBC 136 : qsort(attarray, n, sizeof(AttrNumber), compare_int16);
379 tomas.vondra 560 ECB :
379 tomas.vondra 561 GIC 136 : *natts = n;
562 136 : *attrs = attarray;
563 :
564 136 : bms_free(set);
379 tomas.vondra 565 ECB : }
566 :
567 : /*
568 : * Transform a column list (represented by an array Datum) to a bitmapset.
569 : *
570 : * If columns isn't NULL, add the column numbers to that set.
tgl 571 : *
379 tgl 572 EUB : * If mcxt isn't NULL, build the bitmapset in that context.
573 : */
574 : Bitmapset *
379 tomas.vondra 575 GIC 214 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
576 : {
379 tomas.vondra 577 CBC 214 : Bitmapset *result = NULL;
379 tomas.vondra 578 ECB : ArrayType *arr;
579 : int nelems;
580 : int16 *elems;
332 tgl 581 GIC 214 : MemoryContext oldcxt = NULL;
379 tomas.vondra 582 ECB :
583 : /*
332 tgl 584 : * If an existing bitmap was provided, use it. Otherwise just use NULL and
585 : * build a new bitmap.
586 : */
379 tomas.vondra 587 CBC 214 : if (columns)
379 tomas.vondra 588 UIC 0 : result = columns;
589 :
379 tomas.vondra 590 GIC 214 : arr = DatumGetArrayTypeP(pubcols);
591 214 : nelems = ARR_DIMS(arr)[0];
592 214 : elems = (int16 *) ARR_DATA_PTR(arr);
593 :
594 : /* If a memory context was specified, switch to it. */
595 214 : if (mcxt)
596 37 : oldcxt = MemoryContextSwitchTo(mcxt);
597 :
379 tomas.vondra 598 CBC 594 : for (int i = 0; i < nelems; i++)
379 tomas.vondra 599 GIC 380 : result = bms_add_member(result, elems[i]);
379 tomas.vondra 600 ECB :
379 tomas.vondra 601 GIC 214 : if (mcxt)
602 37 : MemoryContextSwitchTo(oldcxt);
603 :
379 tomas.vondra 604 CBC 214 : return result;
605 : }
606 :
607 : /*
608 : * Insert new publication / schema mapping.
609 : */
529 akapila 610 ECB : ObjectAddress
367 tomas.vondra 611 GBC 111 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
612 : {
529 akapila 613 ECB : Relation rel;
614 : HeapTuple tup;
615 : Datum values[Natts_pg_publication_namespace];
616 : bool nulls[Natts_pg_publication_namespace];
617 : Oid psschid;
529 akapila 618 CBC 111 : Publication *pub = GetPublication(pubid);
619 111 : List *schemaRels = NIL;
620 : ObjectAddress myself,
529 akapila 621 ECB : referenced;
622 :
529 akapila 623 GIC 111 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
529 akapila 624 ECB :
625 : /*
626 : * Check for duplicates. Note that this does not really prevent
627 : * duplicates, it's here just to provide nicer error message in common
628 : * case. The real protection is the unique key on the catalog.
629 : */
367 tomas.vondra 630 GIC 111 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
631 : ObjectIdGetDatum(schemaid),
632 : ObjectIdGetDatum(pubid)))
633 : {
529 akapila 634 CBC 9 : table_close(rel, RowExclusiveLock);
635 :
529 akapila 636 GIC 9 : if (if_not_exists)
637 6 : return InvalidObjectAddress;
638 :
639 3 : ereport(ERROR,
640 : (errcode(ERRCODE_DUPLICATE_OBJECT),
529 akapila 641 ECB : errmsg("schema \"%s\" is already member of publication \"%s\"",
642 : get_namespace_name(schemaid), pub->name)));
643 : }
644 :
529 akapila 645 GIC 102 : check_publication_add_schema(schemaid);
529 akapila 646 ECB :
647 : /* Form a tuple */
529 akapila 648 GIC 99 : memset(values, 0, sizeof(values));
649 99 : memset(nulls, false, sizeof(nulls));
650 :
651 99 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
652 : Anum_pg_publication_namespace_oid);
529 akapila 653 CBC 99 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
529 akapila 654 GIC 99 : values[Anum_pg_publication_namespace_pnpubid - 1] =
655 99 : ObjectIdGetDatum(pubid);
656 99 : values[Anum_pg_publication_namespace_pnnspid - 1] =
529 akapila 657 CBC 99 : ObjectIdGetDatum(schemaid);
658 :
659 99 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
529 akapila 660 ECB :
661 : /* Insert tuple into catalog */
529 akapila 662 CBC 99 : CatalogTupleInsert(rel, tup);
529 akapila 663 GIC 99 : heap_freetuple(tup);
664 :
665 99 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
666 :
667 : /* Add dependency on the publication */
529 akapila 668 CBC 99 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
529 akapila 669 GIC 99 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
670 :
529 akapila 671 ECB : /* Add dependency on the schema */
529 akapila 672 CBC 99 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
529 akapila 673 GIC 99 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
529 akapila 674 ECB :
675 : /* Close the table */
529 akapila 676 CBC 99 : table_close(rel, RowExclusiveLock);
529 akapila 677 ECB :
678 : /*
679 : * Invalidate relcache so that publication info is rebuilt. See
680 : * publication_add_relation for why we need to consider all the
681 : * partitions.
682 : */
367 tomas.vondra 683 GIC 99 : schemaRels = GetSchemaPublicationRelations(schemaid,
684 : PUBLICATION_PART_ALL);
529 akapila 685 CBC 99 : InvalidatePublicationRels(schemaRels);
529 akapila 686 ECB :
529 akapila 687 GIC 99 : return myself;
529 akapila 688 ECB : }
689 :
690 : /* Gets list of publication oids for a relation */
2271 peter_e 691 : List *
2271 peter_e 692 CBC 5246 : GetRelationPublications(Oid relid)
693 : {
2153 bruce 694 GIC 5246 : List *result = NIL;
2153 bruce 695 ECB : CatCList *pubrellist;
696 : int i;
697 :
698 : /* Find all publications associated with the relation. */
2271 peter_e 699 CBC 5246 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
700 : ObjectIdGetDatum(relid));
2271 peter_e 701 GIC 5935 : for (i = 0; i < pubrellist->n_members; i++)
702 : {
703 689 : HeapTuple tup = &pubrellist->members[i]->tuple;
704 689 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
705 :
2271 peter_e 706 CBC 689 : result = lappend_oid(result, pubid);
707 : }
2271 peter_e 708 ECB :
2271 peter_e 709 GIC 5246 : ReleaseSysCacheList(pubrellist);
2271 peter_e 710 ECB :
2271 peter_e 711 GIC 5246 : return result;
712 : }
713 :
714 : /*
2271 peter_e 715 ECB : * Gets list of relation oids for a publication.
716 : *
367 tomas.vondra 717 : * This should only be used FOR TABLE publications, the FOR ALL TABLES
718 : * should use GetAllTablesPublicationRelations().
719 : */
720 : List *
367 tomas.vondra 721 GIC 1006 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
2271 peter_e 722 ECB : {
723 : List *result;
2153 bruce 724 : Relation pubrelsrel;
725 : ScanKeyData scankey;
726 : SysScanDesc scan;
727 : HeapTuple tup;
728 :
2271 peter_e 729 : /* Find all publications associated with the relation. */
1539 andres 730 GIC 1006 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
731 :
2271 peter_e 732 CBC 1006 : ScanKeyInit(&scankey,
733 : Anum_pg_publication_rel_prpubid,
2271 peter_e 734 ECB : BTEqualStrategyNumber, F_OIDEQ,
735 : ObjectIdGetDatum(pubid));
736 :
452 alvherre 737 GIC 1006 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
738 : true, NULL, 1, &scankey);
739 :
2271 peter_e 740 1006 : result = NIL;
741 2352 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
742 : {
743 : Form_pg_publication_rel pubrel;
2271 peter_e 744 ECB :
2271 peter_e 745 GIC 1346 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
367 tomas.vondra 746 1346 : result = GetPubPartitionOptionRelations(result, pub_partopt,
747 : pubrel->prrelid);
748 : }
749 :
2271 peter_e 750 1006 : systable_endscan(scan);
1539 andres 751 1006 : table_close(pubrelsrel, AccessShareLock);
752 :
487 akapila 753 ECB : /* Now sort and de-duplicate the result list */
487 akapila 754 GIC 1006 : list_sort(result, list_oid_cmp);
487 akapila 755 CBC 1006 : list_deduplicate_oid(result);
756 :
2271 peter_e 757 GIC 1006 : return result;
758 : }
759 :
2271 peter_e 760 ECB : /*
761 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
762 : */
763 : List *
2271 peter_e 764 CBC 3622 : GetAllTablesPublications(void)
765 : {
766 : List *result;
767 : Relation rel;
2153 bruce 768 ECB : ScanKeyData scankey;
769 : SysScanDesc scan;
770 : HeapTuple tup;
771 :
772 : /* Find all publications that are marked as for all tables. */
1539 andres 773 CBC 3622 : rel = table_open(PublicationRelationId, AccessShareLock);
2271 peter_e 774 ECB :
2271 peter_e 775 GIC 3622 : ScanKeyInit(&scankey,
776 : Anum_pg_publication_puballtables,
2271 peter_e 777 ECB : BTEqualStrategyNumber, F_BOOLEQ,
778 : BoolGetDatum(true));
779 :
2271 peter_e 780 CBC 3622 : scan = systable_beginscan(rel, InvalidOid, false,
781 : NULL, 1, &scankey);
782 :
2271 peter_e 783 GIC 3622 : result = NIL;
784 3695 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
785 : {
1418 tgl 786 73 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
1601 andres 787 ECB :
1601 andres 788 GIC 73 : result = lappend_oid(result, oid);
789 : }
790 :
2271 peter_e 791 3622 : systable_endscan(scan);
1539 andres 792 3622 : table_close(rel, AccessShareLock);
793 :
2271 peter_e 794 3622 : return result;
795 : }
2271 peter_e 796 ECB :
797 : /*
798 : * Gets list of all relation published by FOR ALL TABLES publication(s).
799 : *
800 : * If the publication publishes partition changes via their respective root
801 : * partitioned tables, we must exclude partitions in favor of including the
802 : * root partitioned tables.
803 : */
804 : List *
1096 peter 805 GIC 125 : GetAllTablesPublicationRelations(bool pubviaroot)
2271 peter_e 806 ECB : {
807 : Relation classRel;
808 : ScanKeyData key[1];
1490 andres 809 : TableScanDesc scan;
810 : HeapTuple tuple;
2271 peter_e 811 CBC 125 : List *result = NIL;
812 :
1539 andres 813 GIC 125 : classRel = table_open(RelationRelationId, AccessShareLock);
2271 peter_e 814 ECB :
2271 peter_e 815 CBC 125 : ScanKeyInit(&key[0],
816 : Anum_pg_class_relkind,
2271 peter_e 817 ECB : BTEqualStrategyNumber, F_CHAREQ,
818 : CharGetDatum(RELKIND_RELATION));
819 :
1490 andres 820 GIC 125 : scan = table_beginscan_catalog(classRel, 1, key);
821 :
2271 peter_e 822 9325 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
823 : {
2153 bruce 824 9200 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1601 andres 825 9200 : Oid relid = relForm->oid;
826 :
1096 peter 827 9200 : if (is_publishable_class(relid, relForm) &&
1096 peter 828 CBC 694 : !(relForm->relispartition && pubviaroot))
2271 peter_e 829 GIC 603 : result = lappend_oid(result, relid);
830 : }
831 :
1490 andres 832 125 : table_endscan(scan);
833 :
1096 peter 834 CBC 125 : if (pubviaroot)
835 : {
836 13 : ScanKeyInit(&key[0],
837 : Anum_pg_class_relkind,
1096 peter 838 ECB : BTEqualStrategyNumber, F_CHAREQ,
839 : CharGetDatum(RELKIND_PARTITIONED_TABLE));
840 :
1096 peter 841 GIC 13 : scan = table_beginscan_catalog(classRel, 1, key);
842 :
1096 peter 843 CBC 78 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
844 : {
845 65 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1096 peter 846 GIC 65 : Oid relid = relForm->oid;
1096 peter 847 ECB :
1096 peter 848 CBC 65 : if (is_publishable_class(relid, relForm) &&
1096 peter 849 GIC 65 : !relForm->relispartition)
1096 peter 850 CBC 52 : result = lappend_oid(result, relid);
1096 peter 851 ECB : }
852 :
1096 peter 853 GIC 13 : table_endscan(scan);
854 : }
2271 peter_e 855 ECB :
1093 peter 856 GIC 125 : table_close(classRel, AccessShareLock);
2271 peter_e 857 CBC 125 : return result;
858 : }
2271 peter_e 859 ECB :
860 : /*
861 : * Gets the list of schema oids for a publication.
862 : *
863 : * This should only be used FOR TABLES IN SCHEMA publications.
529 akapila 864 : */
865 : List *
367 tomas.vondra 866 CBC 971 : GetPublicationSchemas(Oid pubid)
867 : {
529 akapila 868 971 : List *result = NIL;
529 akapila 869 ECB : Relation pubschsrel;
870 : ScanKeyData scankey;
871 : SysScanDesc scan;
872 : HeapTuple tup;
873 :
874 : /* Find all schemas associated with the publication */
529 akapila 875 GIC 971 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
529 akapila 876 ECB :
367 tomas.vondra 877 GIC 971 : ScanKeyInit(&scankey,
878 : Anum_pg_publication_namespace_pnpubid,
529 akapila 879 ECB : BTEqualStrategyNumber, F_OIDEQ,
880 : ObjectIdGetDatum(pubid));
881 :
529 akapila 882 GIC 971 : scan = systable_beginscan(pubschsrel,
883 : PublicationNamespacePnnspidPnpubidIndexId,
884 : true, NULL, 1, &scankey);
885 1021 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
886 : {
887 : Form_pg_publication_namespace pubsch;
888 :
529 akapila 889 CBC 50 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
890 :
891 50 : result = lappend_oid(result, pubsch->pnnspid);
892 : }
893 :
529 akapila 894 GIC 971 : systable_endscan(scan);
895 971 : table_close(pubschsrel, AccessShareLock);
896 :
897 971 : return result;
529 akapila 898 ECB : }
899 :
900 : /*
901 : * Gets the list of publication oids associated with a specified schema.
902 : */
903 : List *
367 tomas.vondra 904 GIC 5084 : GetSchemaPublications(Oid schemaid)
529 akapila 905 ECB : {
529 akapila 906 GIC 5084 : List *result = NIL;
907 : CatCList *pubschlist;
529 akapila 908 ECB : int i;
909 :
910 : /* Find all publications associated with the schema */
529 akapila 911 GIC 5084 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
529 akapila 912 ECB : ObjectIdGetDatum(schemaid));
529 akapila 913 GIC 5141 : for (i = 0; i < pubschlist->n_members; i++)
529 akapila 914 ECB : {
529 akapila 915 GIC 57 : HeapTuple tup = &pubschlist->members[i]->tuple;
916 57 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
529 akapila 917 ECB :
529 akapila 918 CBC 57 : result = lappend_oid(result, pubid);
919 : }
529 akapila 920 ECB :
529 akapila 921 GIC 5084 : ReleaseSysCacheList(pubschlist);
922 :
923 5084 : return result;
924 : }
925 :
926 : /*
529 akapila 927 ECB : * Get the list of publishable relation oids for a specified schema.
928 : */
929 : List *
367 tomas.vondra 930 GIC 230 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
931 : {
932 : Relation classRel;
933 : ScanKeyData key[1];
529 akapila 934 ECB : TableScanDesc scan;
935 : HeapTuple tuple;
529 akapila 936 CBC 230 : List *result = NIL;
937 :
938 230 : Assert(OidIsValid(schemaid));
529 akapila 939 ECB :
529 akapila 940 GIC 230 : classRel = table_open(RelationRelationId, AccessShareLock);
529 akapila 941 ECB :
529 akapila 942 GIC 230 : ScanKeyInit(&key[0],
943 : Anum_pg_class_relnamespace,
529 akapila 944 ECB : BTEqualStrategyNumber, F_OIDEQ,
945 : schemaid);
946 :
947 : /* get all the relations present in the specified schema */
529 akapila 948 GIC 230 : scan = table_beginscan_catalog(classRel, 1, key);
949 11872 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
950 : {
951 11642 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
952 11642 : Oid relid = relForm->oid;
529 akapila 953 ECB : char relkind;
954 :
529 akapila 955 GIC 11642 : if (!is_publishable_class(relid, relForm))
956 4640 : continue;
957 :
958 7002 : relkind = get_rel_relkind(relid);
367 tomas.vondra 959 CBC 7002 : if (relkind == RELKIND_RELATION)
367 tomas.vondra 960 GIC 6732 : result = lappend_oid(result, relid);
367 tomas.vondra 961 CBC 270 : else if (relkind == RELKIND_PARTITIONED_TABLE)
962 : {
529 akapila 963 270 : List *partitionrels = NIL;
964 :
529 akapila 965 ECB : /*
966 : * It is quite possible that some of the partitions are in a
967 : * different schema than the parent table, so we need to get such
968 : * partitions separately.
969 : */
529 akapila 970 GIC 270 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
529 akapila 971 ECB : pub_partopt,
972 : relForm->oid);
529 akapila 973 GIC 270 : result = list_concat_unique_oid(result, partitionrels);
529 akapila 974 ECB : }
975 : }
976 :
529 akapila 977 GIC 230 : table_endscan(scan);
529 akapila 978 CBC 230 : table_close(classRel, AccessShareLock);
979 230 : return result;
980 : }
529 akapila 981 ECB :
982 : /*
199 alvherre 983 : * Gets the list of all relations published by FOR TABLES IN SCHEMA
367 tomas.vondra 984 : * publication.
985 : */
529 akapila 986 : List *
367 tomas.vondra 987 GIC 775 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
988 : {
529 akapila 989 775 : List *result = NIL;
367 tomas.vondra 990 775 : List *pubschemalist = GetPublicationSchemas(pubid);
991 : ListCell *cell;
992 :
529 akapila 993 CBC 810 : foreach(cell, pubschemalist)
994 : {
529 akapila 995 GIC 35 : Oid schemaid = lfirst_oid(cell);
529 akapila 996 CBC 35 : List *schemaRels = NIL;
997 :
367 tomas.vondra 998 GIC 35 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
529 akapila 999 35 : result = list_concat(result, schemaRels);
529 akapila 1000 ECB : }
1001 :
529 akapila 1002 CBC 775 : return result;
1003 : }
1004 :
1005 : /*
1006 : * Get publication using oid
1007 : *
1008 : * The Publication struct and its data are palloc'ed here.
1009 : */
2271 peter_e 1010 ECB : Publication *
2271 peter_e 1011 GIC 3433 : GetPublication(Oid pubid)
2271 peter_e 1012 ECB : {
2153 bruce 1013 : HeapTuple tup;
1014 : Publication *pub;
1015 : Form_pg_publication pubform;
2271 peter_e 1016 :
2271 peter_e 1017 GIC 3433 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2271 peter_e 1018 CBC 3433 : if (!HeapTupleIsValid(tup))
2271 peter_e 1019 LBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1020 :
2271 peter_e 1021 CBC 3433 : pubform = (Form_pg_publication) GETSTRUCT(tup);
2271 peter_e 1022 ECB :
2271 peter_e 1023 GIC 3433 : pub = (Publication *) palloc(sizeof(Publication));
1024 3433 : pub->oid = pubid;
2271 peter_e 1025 CBC 3433 : pub->name = pstrdup(NameStr(pubform->pubname));
2271 peter_e 1026 GIC 3433 : pub->alltables = pubform->puballtables;
1027 3433 : pub->pubactions.pubinsert = pubform->pubinsert;
1028 3433 : pub->pubactions.pubupdate = pubform->pubupdate;
1029 3433 : pub->pubactions.pubdelete = pubform->pubdelete;
1828 1030 3433 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1096 peter 1031 3433 : pub->pubviaroot = pubform->pubviaroot;
1032 :
2271 peter_e 1033 3433 : ReleaseSysCache(tup);
2271 peter_e 1034 ECB :
2271 peter_e 1035 GIC 3433 : return pub;
1036 : }
1037 :
1038 : /*
2271 peter_e 1039 ECB : * Get Publication using name.
1040 : */
2271 peter_e 1041 EUB : Publication *
2271 peter_e 1042 GIC 1038 : GetPublicationByName(const char *pubname, bool missing_ok)
2271 peter_e 1043 ECB : {
1044 : Oid oid;
1045 :
1203 alvherre 1046 CBC 1038 : oid = get_publication_oid(pubname, missing_ok);
2271 peter_e 1047 ECB :
1203 alvherre 1048 CBC 1036 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
2271 peter_e 1049 ECB : }
1050 :
1051 : /*
1052 : * Get information of the tables in the given publication array.
11 akapila 1053 : *
1054 : * Returns pubid, relid, column list, row filter for each table.
1055 : */
1056 : Datum
2271 peter_e 1057 GIC 2604 : pg_get_publication_tables(PG_FUNCTION_ARGS)
2271 peter_e 1058 ECB : {
1059 : #define NUM_PUBLICATION_TABLES_ELEM 4
1060 : FuncCallContext *funcctx;
11 akapila 1061 GNC 2604 : List *table_infos = NIL;
1062 :
1063 : /* stuff done only on the first call of the function */
2271 peter_e 1064 CBC 2604 : if (SRF_IS_FIRSTCALL())
1065 : {
1066 : TupleDesc tupdesc;
2271 peter_e 1067 ECB : MemoryContext oldcontext;
1068 : ArrayType *arr;
1069 : Datum *elems;
1070 : int nelems,
1071 : i;
11 akapila 1072 GNC 811 : bool viaroot = false;
1073 :
1074 : /* create a function context for cross-call persistence */
2271 peter_e 1075 GIC 811 : funcctx = SRF_FIRSTCALL_INIT();
1076 :
1077 : /* switch to memory context appropriate for multiple function calls */
1078 811 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1079 :
1080 : /*
1081 : * Deconstruct the parameter into elements where each element is a
1082 : * publication name.
1083 : */
11 akapila 1084 GNC 811 : arr = PG_GETARG_ARRAYTYPE_P(0);
1085 811 : deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
1086 : &elems, NULL, &nelems);
11 akapila 1087 ECB :
1088 : /* Get Oids of tables from each publication. */
11 akapila 1089 GNC 1666 : for (i = 0; i < nelems; i++)
1090 : {
1091 : Publication *pub_elem;
1092 855 : List *pub_elem_tables = NIL;
1093 : ListCell *lc;
1094 :
1095 855 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1096 :
486 akapila 1097 ECB : /*
1098 : * Publications support partitioned tables. If
1099 : * publish_via_partition_root is false, all changes are replicated
1100 : * using leaf partition identity and schema, so we only need
1101 : * those. Otherwise, get the partitioned table itself.
1102 : */
11 akapila 1103 GNC 855 : if (pub_elem->alltables)
1104 125 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1105 : else
1106 : {
1107 : List *relids,
1108 : *schemarelids;
1109 :
1110 730 : relids = GetPublicationRelations(pub_elem->oid,
1111 730 : pub_elem->pubviaroot ?
1112 730 : PUBLICATION_PART_ROOT :
1113 : PUBLICATION_PART_LEAF);
1114 730 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1115 730 : pub_elem->pubviaroot ?
1116 730 : PUBLICATION_PART_ROOT :
1117 : PUBLICATION_PART_LEAF);
1118 730 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1119 : }
1120 :
1121 : /*
1122 : * Record the published table and the corresponding publication so
1123 : * that we can get row filters and column lists later.
1124 : *
1125 : * When a table is published by multiple publications, to obtain
1126 : * all row filters and column lists, the structure related to this
1127 : * table will be recorded multiple times.
1128 : */
1129 2688 : foreach(lc, pub_elem_tables)
1130 : {
1131 1833 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1132 :
1133 1833 : table_info->relid = lfirst_oid(lc);
1134 1833 : table_info->pubid = pub_elem->oid;
1135 1833 : table_infos = lappend(table_infos, table_info);
1136 : }
1137 :
1138 : /* At least one publication is using publish_via_partition_root. */
1139 855 : if (pub_elem->pubviaroot)
1140 168 : viaroot = true;
529 akapila 1141 ECB : }
1142 :
1143 : /*
1144 : * If the publication publishes partition changes via their respective
1145 : * root partitioned tables, we must exclude partitions in favor of
1146 : * including the root partitioned tables. Otherwise, the function
1147 : * could return both the child and parent tables which could cause
1148 : * data of the child table to be double-published on the subscriber
1149 : * side.
1150 : */
11 akapila 1151 GNC 811 : if (viaroot)
1152 160 : filter_partitions(table_infos);
1153 :
1154 : /* Construct a tuple descriptor for the result rows. */
290 michael 1155 GIC 811 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
11 akapila 1156 GNC 811 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1157 : OIDOID, -1, 0);
1158 811 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1159 : OIDOID, -1, 0);
1160 811 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1161 : INT2VECTOROID, -1, 0);
1162 811 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1163 : PG_NODE_TREEOID, -1, 0);
1164 :
325 akapila 1165 GIC 811 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
11 akapila 1166 GNC 811 : funcctx->user_fctx = (void *) table_infos;
2271 peter_e 1167 ECB :
2271 peter_e 1168 GIC 811 : MemoryContextSwitchTo(oldcontext);
1169 : }
2271 peter_e 1170 ECB :
1171 : /* stuff done on every call of the function */
2271 peter_e 1172 GIC 2604 : funcctx = SRF_PERCALL_SETUP();
11 akapila 1173 GNC 2604 : table_infos = (List *) funcctx->user_fctx;
1174 :
1175 2604 : if (funcctx->call_cntr < list_length(table_infos))
1176 : {
325 akapila 1177 CBC 1793 : HeapTuple pubtuple = NULL;
1178 : HeapTuple rettuple;
1179 : Publication *pub;
11 akapila 1180 GNC 1793 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1181 1793 : Oid relid = table_info->relid;
198 akapila 1182 CBC 1793 : Oid schemaid = get_rel_namespace(relid);
267 peter 1183 GNC 1793 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1184 1793 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
325 akapila 1185 ECB :
1186 : /*
1187 : * Form tuple with appropriate data.
1188 : */
1189 :
11 akapila 1190 GNC 1793 : pub = GetPublication(table_info->pubid);
1191 :
1192 1793 : values[0] = ObjectIdGetDatum(pub->oid);
1193 1793 : values[1] = ObjectIdGetDatum(relid);
1194 :
1195 : /*
198 akapila 1196 ECB : * We don't consider row filters or column lists for FOR ALL TABLES or
1197 : * FOR TABLES IN SCHEMA publications.
1198 : */
11 akapila 1199 GNC 1793 : if (!pub->alltables &&
198 akapila 1200 CBC 1138 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1201 : ObjectIdGetDatum(schemaid),
1202 : ObjectIdGetDatum(pub->oid)))
198 akapila 1203 GIC 1091 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1204 : ObjectIdGetDatum(relid),
1205 : ObjectIdGetDatum(pub->oid));
325 akapila 1206 ECB :
325 akapila 1207 GIC 1793 : if (HeapTupleIsValid(pubtuple))
325 akapila 1208 ECB : {
1209 : /* Lookup the column list attribute. */
11 akapila 1210 GNC 982 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1211 : Anum_pg_publication_rel_prattrs,
1212 : &(nulls[2]));
1213 :
1214 : /* Null indicates no filter. */
1215 982 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
325 akapila 1216 ECB : Anum_pg_publication_rel_prqual,
1217 : &(nulls[3]));
1218 : }
1219 : else
1220 : {
325 akapila 1221 GIC 811 : nulls[2] = true;
11 akapila 1222 GNC 811 : nulls[3] = true;
1223 : }
1224 :
1225 : /* Show all columns when the column list is not specified. */
1226 1793 : if (nulls[2])
1227 : {
86 1228 1675 : Relation rel = table_open(relid, AccessShareLock);
1229 1675 : int nattnums = 0;
1230 : int16 *attnums;
1231 1675 : TupleDesc desc = RelationGetDescr(rel);
1232 : int i;
1233 :
1234 1675 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1235 :
1236 4550 : for (i = 0; i < desc->natts; i++)
1237 : {
1238 2875 : Form_pg_attribute att = TupleDescAttr(desc, i);
1239 :
1240 2875 : if (att->attisdropped || att->attgenerated)
1241 17 : continue;
1242 :
1243 2858 : attnums[nattnums++] = att->attnum;
1244 : }
1245 :
1246 1675 : if (nattnums > 0)
1247 : {
11 1248 1675 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1249 1675 : nulls[2] = false;
1250 : }
1251 :
86 1252 1675 : table_close(rel, AccessShareLock);
86 akapila 1253 ECB : }
1254 :
325 akapila 1255 GIC 1793 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
2271 peter_e 1256 ECB :
325 akapila 1257 GIC 1793 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1258 : }
1259 :
2271 peter_e 1260 811 : SRF_RETURN_DONE(funcctx);
2271 peter_e 1261 ECB : }
|