Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_publication.c
4 : : * publication C API manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_publication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/catalog.h"
22 : : #include "catalog/dependency.h"
23 : : #include "catalog/indexing.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/objectaddress.h"
26 : : #include "catalog/partition.h"
27 : : #include "catalog/pg_inherits.h"
28 : : #include "catalog/pg_namespace.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/publicationcmds.h"
34 : : #include "funcapi.h"
35 : : #include "utils/array.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/catcache.h"
38 : : #include "utils/fmgroids.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : : #include "utils/syscache.h"
42 : :
43 : : /* Records association between publication and published table */
44 : : typedef struct
45 : : {
46 : : Oid relid; /* OID of published table */
47 : : Oid pubid; /* OID of publication that publishes this
48 : : * table. */
49 : : } published_rel;
50 : :
51 : : static void publication_translate_columns(Relation targetrel, List *columns,
52 : : int *natts, AttrNumber **attrs);
53 : :
54 : : /*
55 : : * Check if relation can be in given publication and throws appropriate
56 : : * error if not.
57 : : */
58 : : static void
2642 peter_e@gmx.net 59 :CBC 493 : check_publication_add_relation(Relation targetrel)
60 : : {
61 : : /* Must be a regular or partitioned table */
1496 peter@eisentraut.org 62 [ + + ]: 493 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
738 tomas.vondra@postgre 63 [ + + ]: 71 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
2642 peter_e@gmx.net 64 [ + - ]: 7 : ereport(ERROR,
65 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 : : errmsg("cannot add relation \"%s\" to publication",
67 : : RelationGetRelationName(targetrel)),
68 : : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
69 : :
70 : : /* Can't be system table */
71 [ + + ]: 486 : if (IsCatalogRelation(targetrel))
72 [ + - ]: 3 : ereport(ERROR,
73 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : : errmsg("cannot add relation \"%s\" to publication",
75 : : RelationGetRelationName(targetrel)),
76 : : errdetail("This operation is not supported for system tables.")));
77 : :
78 : : /* UNLOGGED and TEMP relations cannot be part of publication. */
879 dgustafsson@postgres 79 [ + + ]: 483 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
2642 peter_e@gmx.net 80 [ + - ]: 3 : ereport(ERROR,
81 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 : : errmsg("cannot add relation \"%s\" to publication",
83 : : RelationGetRelationName(targetrel)),
84 : : errdetail("This operation is not supported for temporary tables.")));
879 dgustafsson@postgres 85 [ + + ]: 480 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
86 [ + - ]: 3 : ereport(ERROR,
87 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
88 : : errmsg("cannot add relation \"%s\" to publication",
89 : : RelationGetRelationName(targetrel)),
90 : : errdetail("This operation is not supported for unlogged tables.")));
2642 peter_e@gmx.net 91 : 477 : }
92 : :
93 : : /*
94 : : * Check if schema can be in given publication and throw appropriate error if
95 : : * not.
96 : : */
97 : : static void
900 akapila@postgresql.o 98 : 102 : check_publication_add_schema(Oid schemaid)
99 : : {
100 : : /* Can't be system namespace */
101 [ + + - + ]: 102 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
102 [ + - ]: 3 : ereport(ERROR,
103 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
104 : : errmsg("cannot add schema \"%s\" to publication",
105 : : get_namespace_name(schemaid)),
106 : : errdetail("This operation is not supported for system schemas.")));
107 : :
108 : : /* Can't be temporary namespace */
109 [ - + ]: 99 : if (isAnyTempNamespace(schemaid))
900 akapila@postgresql.o 110 [ # # ]:UBC 0 : ereport(ERROR,
111 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : : errmsg("cannot add schema \"%s\" to publication",
113 : : get_namespace_name(schemaid)),
114 : : errdetail("Temporary schemas cannot be replicated.")));
900 akapila@postgresql.o 115 :CBC 99 : }
116 : :
117 : : /*
118 : : * Returns if relation represented by oid and Form_pg_class entry
119 : : * is publishable.
120 : : *
121 : : * Does same checks as check_publication_add_relation() above, but does not
122 : : * need relation to be opened and also does not throw errors.
123 : : *
124 : : * XXX This also excludes all tables with relid < FirstNormalObjectId,
125 : : * ie all tables created during initdb. This mainly affects the preinstalled
126 : : * information_schema. IsCatalogRelationOid() only excludes tables with
127 : : * relid < FirstUnpinnedObjectId, making that test rather redundant,
128 : : * but really we should get rid of the FirstNormalObjectId test not
129 : : * IsCatalogRelationOid. We can't do so today because we don't want
130 : : * information_schema tables to be considered publishable; but this test
131 : : * is really inadequate for that, since the information_schema could be
132 : : * dropped and reloaded and then it'll be considered publishable. The best
133 : : * long-term solution may be to add a "relispublishable" bool to pg_class,
134 : : * and depend on that instead of OID checks.
135 : : */
136 : : static bool
2642 peter_e@gmx.net 137 : 304676 : is_publishable_class(Oid relid, Form_pg_class reltuple)
138 : : {
1496 peter@eisentraut.org 139 : 311027 : return (reltuple->relkind == RELKIND_RELATION ||
738 tomas.vondra@postgre 140 [ + + ]: 6351 : reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
1803 tgl@sss.pgh.pa.us 141 [ + + ]: 298964 : !IsCatalogRelationOid(relid) &&
2642 peter_e@gmx.net 142 [ + + + + : 609352 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+ + ]
143 : : relid >= FirstNormalObjectId;
144 : : }
145 : :
146 : : /*
147 : : * Another variant of is_publishable_class(), taking a Relation.
148 : : */
149 : : bool
625 akapila@postgresql.o 150 : 278320 : is_publishable_relation(Relation rel)
151 : : {
152 : 278320 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
153 : : }
154 : :
155 : : /*
156 : : * SQL-callable variant of the above
157 : : *
158 : : * This returns null when the relation does not exist. This is intended to be
159 : : * used for example in psql to avoid gratuitous errors when there are
160 : : * concurrent catalog changes.
161 : : */
162 : : Datum
621 163 : 2610 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
164 : : {
165 : 2610 : Oid relid = PG_GETARG_OID(0);
166 : : HeapTuple tuple;
167 : : bool result;
168 : :
169 : 2610 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
170 [ - + ]: 2610 : if (!HeapTupleIsValid(tuple))
621 akapila@postgresql.o 171 :UBC 0 : PG_RETURN_NULL();
621 akapila@postgresql.o 172 :CBC 2610 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
173 : 2610 : ReleaseSysCache(tuple);
174 : 2610 : PG_RETURN_BOOL(result);
175 : : }
176 : :
177 : : /*
178 : : * Returns true if the ancestor is in the list of published relations.
179 : : * Otherwise, returns false.
180 : : */
181 : : static bool
382 182 : 99 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
183 : : {
184 : : ListCell *lc;
185 : :
186 [ + - + + : 339 : foreach(lc, table_infos)
+ + ]
187 : : {
188 : 280 : Oid relid = ((published_rel *) lfirst(lc))->relid;
189 : :
190 [ + + ]: 280 : if (relid == ancestor)
191 : 40 : return true;
192 : : }
193 : :
194 : 59 : return false;
195 : : }
196 : :
197 : : /*
198 : : * Filter out the partitions whose parent tables are also present in the list.
199 : : */
200 : : static void
201 : 168 : filter_partitions(List *table_infos)
202 : : {
203 : : ListCell *lc;
204 : :
205 [ + - + + : 499 : foreach(lc, table_infos)
+ + ]
206 : : {
900 207 : 331 : bool skip = false;
208 : 331 : List *ancestors = NIL;
209 : : ListCell *lc2;
382 210 : 331 : published_rel *table_info = (published_rel *) lfirst(lc);
211 : :
212 [ + + ]: 331 : if (get_rel_relispartition(table_info->relid))
213 : 99 : ancestors = get_partition_ancestors(table_info->relid);
214 : :
900 215 [ + + + + : 390 : foreach(lc2, ancestors)
+ + ]
216 : : {
217 : 99 : Oid ancestor = lfirst_oid(lc2);
218 : :
382 219 [ + + ]: 99 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
220 : : {
900 221 : 40 : skip = true;
222 : 40 : break;
223 : : }
224 : : }
225 : :
382 226 [ + + ]: 331 : if (skip)
227 : 40 : table_infos = foreach_delete_current(table_infos, lc);
228 : : }
900 229 : 168 : }
230 : :
231 : : /*
232 : : * Returns true if any schema is associated with the publication, false if no
233 : : * schema is associated with the publication.
234 : : */
235 : : bool
858 236 : 128 : is_schema_publication(Oid pubid)
237 : : {
238 : : Relation pubschsrel;
239 : : ScanKeyData scankey;
240 : : SysScanDesc scan;
241 : : HeapTuple tup;
242 : 128 : bool result = false;
243 : :
244 : 128 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
245 : 128 : ScanKeyInit(&scankey,
246 : : Anum_pg_publication_namespace_pnpubid,
247 : : BTEqualStrategyNumber, F_OIDEQ,
248 : : ObjectIdGetDatum(pubid));
249 : :
250 : 128 : scan = systable_beginscan(pubschsrel,
251 : : PublicationNamespacePnnspidPnpubidIndexId,
252 : : true, NULL, 1, &scankey);
253 : 128 : tup = systable_getnext(scan);
254 : 128 : result = HeapTupleIsValid(tup);
255 : :
256 : 128 : systable_endscan(scan);
257 : 128 : table_close(pubschsrel, AccessShareLock);
258 : :
259 : 128 : return result;
260 : : }
261 : :
262 : : /*
263 : : * Gets the relations based on the publication partition option for a specified
264 : : * relation.
265 : : */
266 : : List *
935 267 : 2599 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
268 : : Oid relid)
269 : : {
270 [ + + + + ]: 2599 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
271 : : pub_partopt != PUBLICATION_PART_ROOT)
272 : 516 : {
273 : 516 : List *all_parts = find_all_inheritors(relid, NoLock,
274 : : NULL);
275 : :
276 [ + + ]: 516 : if (pub_partopt == PUBLICATION_PART_ALL)
277 : 419 : result = list_concat(result, all_parts);
278 [ + - ]: 97 : else if (pub_partopt == PUBLICATION_PART_LEAF)
279 : : {
280 : : ListCell *lc;
281 : :
282 [ + - + + : 355 : foreach(lc, all_parts)
+ + ]
283 : : {
284 : 258 : Oid partOid = lfirst_oid(lc);
285 : :
286 [ + + ]: 258 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
287 : 161 : result = lappend_oid(result, partOid);
288 : : }
289 : : }
290 : : else
935 akapila@postgresql.o 291 :UBC 0 : Assert(false);
292 : : }
293 : : else
935 akapila@postgresql.o 294 :CBC 2083 : result = lappend_oid(result, relid);
295 : :
296 : 2599 : return result;
297 : : }
298 : :
299 : : /*
300 : : * Returns the relid of the topmost ancestor that is published via this
301 : : * publication if any and set its ancestor level to ancestor_level,
302 : : * otherwise returns InvalidOid.
303 : : *
304 : : * The ancestor_level value allows us to compare the results for multiple
305 : : * publications, and decide which value is higher up.
306 : : *
307 : : * Note that the list of ancestors should be ordered such that the topmost
308 : : * ancestor is at the end of the list.
309 : : */
310 : : Oid
760 tomas.vondra@postgre 311 : 252 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
312 : : {
313 : : ListCell *lc;
782 akapila@postgresql.o 314 : 252 : Oid topmost_relid = InvalidOid;
760 tomas.vondra@postgre 315 : 252 : int level = 0;
316 : :
317 : : /*
318 : : * Find the "topmost" ancestor that is in this publication.
319 : : */
782 akapila@postgresql.o 320 [ + - + + : 513 : foreach(lc, ancestors)
+ + ]
321 : : {
322 : 261 : Oid ancestor = lfirst_oid(lc);
323 : 261 : List *apubids = GetRelationPublications(ancestor);
324 : 261 : List *aschemaPubids = NIL;
325 : :
760 tomas.vondra@postgre 326 : 261 : level++;
327 : :
782 akapila@postgresql.o 328 [ + + ]: 261 : if (list_member_oid(apubids, puboid))
329 : : {
330 : 154 : topmost_relid = ancestor;
331 : :
760 tomas.vondra@postgre 332 [ + + ]: 154 : if (ancestor_level)
333 : 43 : *ancestor_level = level;
334 : : }
335 : : else
336 : : {
738 337 : 107 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
782 akapila@postgresql.o 338 [ + + ]: 107 : if (list_member_oid(aschemaPubids, puboid))
339 : : {
340 : 5 : topmost_relid = ancestor;
341 : :
760 tomas.vondra@postgre 342 [ + - ]: 5 : if (ancestor_level)
343 : 5 : *ancestor_level = level;
344 : : }
345 : : }
346 : :
782 akapila@postgresql.o 347 : 261 : list_free(apubids);
348 : 261 : list_free(aschemaPubids);
349 : : }
350 : :
351 : 252 : return topmost_relid;
352 : : }
353 : :
354 : : /*
355 : : * Insert new publication / relation mapping.
356 : : */
357 : : ObjectAddress
358 : 504 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
359 : : bool if_not_exists)
360 : : {
361 : : Relation rel;
362 : : HeapTuple tup;
363 : : Datum values[Natts_pg_publication_rel];
364 : : bool nulls[Natts_pg_publication_rel];
365 : 504 : Relation targetrel = pri->relation;
366 : 504 : Oid relid = RelationGetRelid(targetrel);
367 : : Oid pubreloid;
2642 peter_e@gmx.net 368 : 504 : Publication *pub = GetPublication(pubid);
750 tomas.vondra@postgre 369 : 504 : AttrNumber *attarray = NULL;
370 : 504 : int natts = 0;
371 : : ObjectAddress myself,
372 : : referenced;
935 akapila@postgresql.o 373 : 504 : List *relids = NIL;
374 : :
1910 andres@anarazel.de 375 : 504 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
376 : :
377 : : /*
378 : : * Check for duplicates. Note that this does not really prevent
379 : : * duplicates, it's here just to provide nicer error message in common
380 : : * case. The real protection is the unique key on the catalog.
381 : : */
2642 peter_e@gmx.net 382 [ + + ]: 504 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
383 : : ObjectIdGetDatum(pubid)))
384 : : {
1910 andres@anarazel.de 385 : 11 : table_close(rel, RowExclusiveLock);
386 : :
2642 peter_e@gmx.net 387 [ + + ]: 11 : if (if_not_exists)
388 : 8 : return InvalidObjectAddress;
389 : :
390 [ + - ]: 3 : ereport(ERROR,
391 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
392 : : errmsg("relation \"%s\" is already member of publication \"%s\"",
393 : : RelationGetRelationName(targetrel), pub->name)));
394 : : }
395 : :
782 akapila@postgresql.o 396 : 493 : check_publication_add_relation(targetrel);
397 : :
398 : : /*
399 : : * Translate column names to attnums and make sure the column list
400 : : * contains only allowed elements (no system or generated columns etc.).
401 : : * Also build an array of attnums, for storing in the catalog.
402 : : */
750 tomas.vondra@postgre 403 : 477 : publication_translate_columns(pri->relation, pri->columns,
404 : : &natts, &attarray);
405 : :
406 : : /* Form a tuple. */
2642 peter_e@gmx.net 407 : 468 : memset(values, 0, sizeof(values));
408 : 468 : memset(nulls, false, sizeof(nulls));
409 : :
836 alvherre@alvh.no-ip. 410 : 468 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
411 : : Anum_pg_publication_rel_oid);
412 : 468 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
2642 peter_e@gmx.net 413 : 468 : values[Anum_pg_publication_rel_prpubid - 1] =
414 : 468 : ObjectIdGetDatum(pubid);
415 : 468 : values[Anum_pg_publication_rel_prrelid - 1] =
416 : 468 : ObjectIdGetDatum(relid);
417 : :
418 : : /* Add qualifications, if available */
782 akapila@postgresql.o 419 [ + + ]: 468 : if (pri->whereClause != NULL)
420 : 149 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
421 : : else
422 : 319 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
423 : :
424 : : /* Add column list, if available */
750 tomas.vondra@postgre 425 [ + + ]: 468 : if (pri->columns)
426 : 136 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
427 : : else
428 : 332 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
429 : :
2642 peter_e@gmx.net 430 : 468 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
431 : :
432 : : /* Insert tuple into catalog. */
1972 andres@anarazel.de 433 : 468 : CatalogTupleInsert(rel, tup);
2642 peter_e@gmx.net 434 : 468 : heap_freetuple(tup);
435 : :
436 : : /* Register dependencies as needed */
836 alvherre@alvh.no-ip. 437 : 468 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
438 : :
439 : : /* Add dependency on the publication */
2642 peter_e@gmx.net 440 : 468 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
441 : 468 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
442 : :
443 : : /* Add dependency on the relation */
444 : 468 : ObjectAddressSet(referenced, RelationRelationId, relid);
445 : 468 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
446 : :
447 : : /* Add dependency on the objects mentioned in the qualifications */
782 akapila@postgresql.o 448 [ + + ]: 468 : if (pri->whereClause)
449 : 149 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
450 : : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
451 : : false);
452 : :
453 : : /* Add dependency on the columns, if any are listed */
750 tomas.vondra@postgre 454 [ + + ]: 706 : for (int i = 0; i < natts; i++)
455 : : {
456 : 238 : ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
457 : 238 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
458 : : }
459 : :
460 : : /* Close the table. */
1910 andres@anarazel.de 461 : 468 : table_close(rel, RowExclusiveLock);
462 : :
463 : : /*
464 : : * Invalidate relcache so that publication info is rebuilt.
465 : : *
466 : : * For the partitioned tables, we must invalidate all partitions contained
467 : : * in the respective partition hierarchies, not just the one explicitly
468 : : * mentioned in the publication. This is required because we implicitly
469 : : * publish the child tables when the parent table is published.
470 : : */
935 akapila@postgresql.o 471 : 468 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
472 : : relid);
473 : :
474 : 468 : InvalidatePublicationRels(relids);
475 : :
2642 peter_e@gmx.net 476 : 468 : return myself;
477 : : }
478 : :
479 : : /* qsort comparator for attnums */
480 : : static int
750 tomas.vondra@postgre 481 : 102 : compare_int16(const void *a, const void *b)
482 : : {
483 : 102 : int av = *(const int16 *) a;
484 : 102 : int bv = *(const int16 *) b;
485 : :
486 : : /* this can't overflow if int is wider than int16 */
487 : 102 : return (av - bv);
488 : : }
489 : :
490 : : /*
491 : : * Translate a list of column names to an array of attribute numbers
492 : : * and a Bitmapset with them; verify that each attribute is appropriate
493 : : * to have in a publication column list (no system or generated attributes,
494 : : * no duplicates). Additional checks with replica identity are done later;
495 : : * see pub_collist_contains_invalid_column.
496 : : *
497 : : * Note that the attribute numbers are *not* offset by
498 : : * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
499 : : * is okay.
500 : : */
501 : : static void
502 : 477 : publication_translate_columns(Relation targetrel, List *columns,
503 : : int *natts, AttrNumber **attrs)
504 : : {
505 : 477 : AttrNumber *attarray = NULL;
506 : 477 : Bitmapset *set = NULL;
507 : : ListCell *lc;
508 : 477 : int n = 0;
509 : 477 : TupleDesc tupdesc = RelationGetDescr(targetrel);
510 : :
511 : : /* Bail out when no column list defined. */
512 [ + + ]: 477 : if (!columns)
513 : 332 : return;
514 : :
515 : : /*
516 : : * Translate list of columns to attnums. We prohibit system attributes and
517 : : * make sure there are no duplicate columns.
518 : : */
519 : 145 : attarray = palloc(sizeof(AttrNumber) * list_length(columns));
520 [ + - + + : 392 : foreach(lc, columns)
+ + ]
521 : : {
522 : 256 : char *colname = strVal(lfirst(lc));
523 : 256 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
524 : :
525 [ + + ]: 256 : if (attnum == InvalidAttrNumber)
526 [ + - ]: 3 : ereport(ERROR,
527 : : errcode(ERRCODE_UNDEFINED_COLUMN),
528 : : errmsg("column \"%s\" of relation \"%s\" does not exist",
529 : : colname, RelationGetRelationName(targetrel)));
530 : :
531 [ + + ]: 253 : if (!AttrNumberIsForUserDefinedAttr(attnum))
532 [ + - ]: 3 : ereport(ERROR,
533 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
534 : : errmsg("cannot use system column \"%s\" in publication column list",
535 : : colname));
536 : :
537 [ + + ]: 250 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
538 [ + - ]: 3 : ereport(ERROR,
539 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
540 : : errmsg("cannot use generated column \"%s\" in publication column list",
541 : : colname));
542 : :
543 [ - + ]: 247 : if (bms_is_member(attnum, set))
750 tomas.vondra@postgre 544 [ # # ]:UBC 0 : ereport(ERROR,
545 : : errcode(ERRCODE_DUPLICATE_OBJECT),
546 : : errmsg("duplicate column \"%s\" in publication column list",
547 : : colname));
548 : :
750 tomas.vondra@postgre 549 :CBC 247 : set = bms_add_member(set, attnum);
550 : 247 : attarray[n++] = attnum;
551 : : }
552 : :
553 : : /* Be tidy, so that the catalog representation is always sorted */
554 : 136 : qsort(attarray, n, sizeof(AttrNumber), compare_int16);
555 : :
556 : 136 : *natts = n;
557 : 136 : *attrs = attarray;
558 : :
559 : 136 : bms_free(set);
560 : : }
561 : :
562 : : /*
563 : : * Transform a column list (represented by an array Datum) to a bitmapset.
564 : : *
565 : : * If columns isn't NULL, add the column numbers to that set.
566 : : *
567 : : * If mcxt isn't NULL, build the bitmapset in that context.
568 : : */
569 : : Bitmapset *
570 : 214 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
571 : : {
572 : 214 : Bitmapset *result = NULL;
573 : : ArrayType *arr;
574 : : int nelems;
575 : : int16 *elems;
703 tgl@sss.pgh.pa.us 576 : 214 : MemoryContext oldcxt = NULL;
577 : :
578 : : /*
579 : : * If an existing bitmap was provided, use it. Otherwise just use NULL and
580 : : * build a new bitmap.
581 : : */
750 tomas.vondra@postgre 582 [ - + ]: 214 : if (columns)
750 tomas.vondra@postgre 583 :UBC 0 : result = columns;
584 : :
750 tomas.vondra@postgre 585 :CBC 214 : arr = DatumGetArrayTypeP(pubcols);
586 : 214 : nelems = ARR_DIMS(arr)[0];
587 [ - + ]: 214 : elems = (int16 *) ARR_DATA_PTR(arr);
588 : :
589 : : /* If a memory context was specified, switch to it. */
590 [ + + ]: 214 : if (mcxt)
591 : 37 : oldcxt = MemoryContextSwitchTo(mcxt);
592 : :
593 [ + + ]: 594 : for (int i = 0; i < nelems; i++)
594 : 380 : result = bms_add_member(result, elems[i]);
595 : :
596 [ + + ]: 214 : if (mcxt)
597 : 37 : MemoryContextSwitchTo(oldcxt);
598 : :
599 : 214 : return result;
600 : : }
601 : :
602 : : /*
603 : : * Insert new publication / schema mapping.
604 : : */
605 : : ObjectAddress
738 606 : 111 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
607 : : {
608 : : Relation rel;
609 : : HeapTuple tup;
610 : : Datum values[Natts_pg_publication_namespace];
611 : : bool nulls[Natts_pg_publication_namespace];
612 : : Oid psschid;
900 akapila@postgresql.o 613 : 111 : Publication *pub = GetPublication(pubid);
614 : 111 : List *schemaRels = NIL;
615 : : ObjectAddress myself,
616 : : referenced;
617 : :
618 : 111 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
619 : :
620 : : /*
621 : : * Check for duplicates. Note that this does not really prevent
622 : : * duplicates, it's here just to provide nicer error message in common
623 : : * case. The real protection is the unique key on the catalog.
624 : : */
738 tomas.vondra@postgre 625 [ + + ]: 111 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
626 : : ObjectIdGetDatum(schemaid),
627 : : ObjectIdGetDatum(pubid)))
628 : : {
900 akapila@postgresql.o 629 : 9 : table_close(rel, RowExclusiveLock);
630 : :
631 [ + + ]: 9 : if (if_not_exists)
632 : 6 : return InvalidObjectAddress;
633 : :
634 [ + - ]: 3 : ereport(ERROR,
635 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
636 : : errmsg("schema \"%s\" is already member of publication \"%s\"",
637 : : get_namespace_name(schemaid), pub->name)));
638 : : }
639 : :
640 : 102 : check_publication_add_schema(schemaid);
641 : :
642 : : /* Form a tuple */
643 : 99 : memset(values, 0, sizeof(values));
644 : 99 : memset(nulls, false, sizeof(nulls));
645 : :
646 : 99 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
647 : : Anum_pg_publication_namespace_oid);
648 : 99 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
649 : 99 : values[Anum_pg_publication_namespace_pnpubid - 1] =
650 : 99 : ObjectIdGetDatum(pubid);
651 : 99 : values[Anum_pg_publication_namespace_pnnspid - 1] =
652 : 99 : ObjectIdGetDatum(schemaid);
653 : :
654 : 99 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
655 : :
656 : : /* Insert tuple into catalog */
657 : 99 : CatalogTupleInsert(rel, tup);
658 : 99 : heap_freetuple(tup);
659 : :
660 : 99 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
661 : :
662 : : /* Add dependency on the publication */
663 : 99 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
664 : 99 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
665 : :
666 : : /* Add dependency on the schema */
667 : 99 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
668 : 99 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
669 : :
670 : : /* Close the table */
671 : 99 : table_close(rel, RowExclusiveLock);
672 : :
673 : : /*
674 : : * Invalidate relcache so that publication info is rebuilt. See
675 : : * publication_add_relation for why we need to consider all the
676 : : * partitions.
677 : : */
738 tomas.vondra@postgre 678 : 99 : schemaRels = GetSchemaPublicationRelations(schemaid,
679 : : PUBLICATION_PART_ALL);
900 akapila@postgresql.o 680 : 99 : InvalidatePublicationRels(schemaRels);
681 : :
682 : 99 : return myself;
683 : : }
684 : :
685 : : /* Gets list of publication oids for a relation */
686 : : List *
2642 peter_e@gmx.net 687 : 5689 : GetRelationPublications(Oid relid)
688 : : {
2524 bruce@momjian.us 689 : 5689 : List *result = NIL;
690 : : CatCList *pubrellist;
691 : : int i;
692 : :
693 : : /* Find all publications associated with the relation. */
2642 peter_e@gmx.net 694 : 5689 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
695 : : ObjectIdGetDatum(relid));
696 [ + + ]: 6466 : for (i = 0; i < pubrellist->n_members; i++)
697 : : {
698 : 777 : HeapTuple tup = &pubrellist->members[i]->tuple;
699 : 777 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
700 : :
701 : 777 : result = lappend_oid(result, pubid);
702 : : }
703 : :
704 : 5689 : ReleaseSysCacheList(pubrellist);
705 : :
706 : 5689 : return result;
707 : : }
708 : :
709 : : /*
710 : : * Gets list of relation oids for a publication.
711 : : *
712 : : * This should only be used FOR TABLE publications, the FOR ALL TABLES
713 : : * should use GetAllTablesPublicationRelations().
714 : : */
715 : : List *
738 tomas.vondra@postgre 716 : 1037 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
717 : : {
718 : : List *result;
719 : : Relation pubrelsrel;
720 : : ScanKeyData scankey;
721 : : SysScanDesc scan;
722 : : HeapTuple tup;
723 : :
724 : : /* Find all publications associated with the relation. */
1910 andres@anarazel.de 725 : 1037 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
726 : :
2642 peter_e@gmx.net 727 : 1037 : ScanKeyInit(&scankey,
728 : : Anum_pg_publication_rel_prpubid,
729 : : BTEqualStrategyNumber, F_OIDEQ,
730 : : ObjectIdGetDatum(pubid));
731 : :
823 alvherre@alvh.no-ip. 732 : 1037 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
733 : : true, NULL, 1, &scankey);
734 : :
2642 peter_e@gmx.net 735 : 1037 : result = NIL;
736 [ + + ]: 2451 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
737 : : {
738 : : Form_pg_publication_rel pubrel;
739 : :
740 : 1414 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
738 tomas.vondra@postgre 741 : 1414 : result = GetPubPartitionOptionRelations(result, pub_partopt,
742 : : pubrel->prrelid);
743 : : }
744 : :
2642 peter_e@gmx.net 745 : 1037 : systable_endscan(scan);
1910 andres@anarazel.de 746 : 1037 : table_close(pubrelsrel, AccessShareLock);
747 : :
748 : : /* Now sort and de-duplicate the result list */
858 akapila@postgresql.o 749 : 1037 : list_sort(result, list_oid_cmp);
750 : 1037 : list_deduplicate_oid(result);
751 : :
2642 peter_e@gmx.net 752 : 1037 : return result;
753 : : }
754 : :
755 : : /*
756 : : * Gets list of publication oids for publications marked as FOR ALL TABLES.
757 : : */
758 : : List *
759 : 3857 : GetAllTablesPublications(void)
760 : : {
761 : : List *result;
762 : : Relation rel;
763 : : ScanKeyData scankey;
764 : : SysScanDesc scan;
765 : : HeapTuple tup;
766 : :
767 : : /* Find all publications that are marked as for all tables. */
1910 andres@anarazel.de 768 : 3857 : rel = table_open(PublicationRelationId, AccessShareLock);
769 : :
2642 peter_e@gmx.net 770 : 3857 : ScanKeyInit(&scankey,
771 : : Anum_pg_publication_puballtables,
772 : : BTEqualStrategyNumber, F_BOOLEQ,
773 : : BoolGetDatum(true));
774 : :
775 : 3857 : scan = systable_beginscan(rel, InvalidOid, false,
776 : : NULL, 1, &scankey);
777 : :
778 : 3857 : result = NIL;
779 [ + + ]: 3930 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
780 : : {
1789 tgl@sss.pgh.pa.us 781 : 73 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
782 : :
1972 andres@anarazel.de 783 : 73 : result = lappend_oid(result, oid);
784 : : }
785 : :
2642 peter_e@gmx.net 786 : 3857 : systable_endscan(scan);
1910 andres@anarazel.de 787 : 3857 : table_close(rel, AccessShareLock);
788 : :
2642 peter_e@gmx.net 789 : 3857 : return result;
790 : : }
791 : :
792 : : /*
793 : : * Gets list of all relation published by FOR ALL TABLES publication(s).
794 : : *
795 : : * If the publication publishes partition changes via their respective root
796 : : * partitioned tables, we must exclude partitions in favor of including the
797 : : * root partitioned tables.
798 : : */
799 : : List *
1467 peter@eisentraut.org 800 : 140 : GetAllTablesPublicationRelations(bool pubviaroot)
801 : : {
802 : : Relation classRel;
803 : : ScanKeyData key[1];
804 : : TableScanDesc scan;
805 : : HeapTuple tuple;
2642 peter_e@gmx.net 806 : 140 : List *result = NIL;
807 : :
1910 andres@anarazel.de 808 : 140 : classRel = table_open(RelationRelationId, AccessShareLock);
809 : :
2642 peter_e@gmx.net 810 : 140 : ScanKeyInit(&key[0],
811 : : Anum_pg_class_relkind,
812 : : BTEqualStrategyNumber, F_CHAREQ,
813 : : CharGetDatum(RELKIND_RELATION));
814 : :
1861 andres@anarazel.de 815 : 140 : scan = table_beginscan_catalog(classRel, 1, key);
816 : :
2642 peter_e@gmx.net 817 [ + + ]: 10373 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
818 : : {
2524 bruce@momjian.us 819 : 10233 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1972 andres@anarazel.de 820 : 10233 : Oid relid = relForm->oid;
821 : :
1467 peter@eisentraut.org 822 [ + + ]: 10233 : if (is_publishable_class(relid, relForm) &&
823 [ + + + + ]: 707 : !(relForm->relispartition && pubviaroot))
2642 peter_e@gmx.net 824 : 616 : result = lappend_oid(result, relid);
825 : : }
826 : :
1861 andres@anarazel.de 827 : 140 : table_endscan(scan);
828 : :
1467 peter@eisentraut.org 829 [ + + ]: 140 : if (pubviaroot)
830 : : {
831 : 13 : ScanKeyInit(&key[0],
832 : : Anum_pg_class_relkind,
833 : : BTEqualStrategyNumber, F_CHAREQ,
834 : : CharGetDatum(RELKIND_PARTITIONED_TABLE));
835 : :
836 : 13 : scan = table_beginscan_catalog(classRel, 1, key);
837 : :
838 [ + + ]: 78 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
839 : : {
840 : 65 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
841 : 65 : Oid relid = relForm->oid;
842 : :
843 [ + - ]: 65 : if (is_publishable_class(relid, relForm) &&
844 [ + + ]: 65 : !relForm->relispartition)
845 : 52 : result = lappend_oid(result, relid);
846 : : }
847 : :
848 : 13 : table_endscan(scan);
849 : : }
850 : :
1464 851 : 140 : table_close(classRel, AccessShareLock);
2642 peter_e@gmx.net 852 : 140 : return result;
853 : : }
854 : :
855 : : /*
856 : : * Gets the list of schema oids for a publication.
857 : : *
858 : : * This should only be used FOR TABLES IN SCHEMA publications.
859 : : */
860 : : List *
738 tomas.vondra@postgre 861 : 1002 : GetPublicationSchemas(Oid pubid)
862 : : {
900 akapila@postgresql.o 863 : 1002 : List *result = NIL;
864 : : Relation pubschsrel;
865 : : ScanKeyData scankey;
866 : : SysScanDesc scan;
867 : : HeapTuple tup;
868 : :
869 : : /* Find all schemas associated with the publication */
870 : 1002 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
871 : :
738 tomas.vondra@postgre 872 : 1002 : ScanKeyInit(&scankey,
873 : : Anum_pg_publication_namespace_pnpubid,
874 : : BTEqualStrategyNumber, F_OIDEQ,
875 : : ObjectIdGetDatum(pubid));
876 : :
900 akapila@postgresql.o 877 : 1002 : scan = systable_beginscan(pubschsrel,
878 : : PublicationNamespacePnnspidPnpubidIndexId,
879 : : true, NULL, 1, &scankey);
880 [ + + ]: 1052 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
881 : : {
882 : : Form_pg_publication_namespace pubsch;
883 : :
884 : 50 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
885 : :
886 : 50 : result = lappend_oid(result, pubsch->pnnspid);
887 : : }
888 : :
889 : 1002 : systable_endscan(scan);
890 : 1002 : table_close(pubschsrel, AccessShareLock);
891 : :
892 : 1002 : return result;
893 : : }
894 : :
895 : : /*
896 : : * Gets the list of publication oids associated with a specified schema.
897 : : */
898 : : List *
738 tomas.vondra@postgre 899 : 5513 : GetSchemaPublications(Oid schemaid)
900 : : {
900 akapila@postgresql.o 901 : 5513 : List *result = NIL;
902 : : CatCList *pubschlist;
903 : : int i;
904 : :
905 : : /* Find all publications associated with the schema */
906 : 5513 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
907 : : ObjectIdGetDatum(schemaid));
908 [ + + ]: 5570 : for (i = 0; i < pubschlist->n_members; i++)
909 : : {
910 : 57 : HeapTuple tup = &pubschlist->members[i]->tuple;
911 : 57 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
912 : :
913 : 57 : result = lappend_oid(result, pubid);
914 : : }
915 : :
916 : 5513 : ReleaseSysCacheList(pubschlist);
917 : :
918 : 5513 : return result;
919 : : }
920 : :
921 : : /*
922 : : * Get the list of publishable relation oids for a specified schema.
923 : : */
924 : : List *
738 tomas.vondra@postgre 925 : 230 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
926 : : {
927 : : Relation classRel;
928 : : ScanKeyData key[1];
929 : : TableScanDesc scan;
930 : : HeapTuple tuple;
900 akapila@postgresql.o 931 : 230 : List *result = NIL;
932 : :
933 [ - + ]: 230 : Assert(OidIsValid(schemaid));
934 : :
935 : 230 : classRel = table_open(RelationRelationId, AccessShareLock);
936 : :
937 : 230 : ScanKeyInit(&key[0],
938 : : Anum_pg_class_relnamespace,
939 : : BTEqualStrategyNumber, F_OIDEQ,
940 : : schemaid);
941 : :
942 : : /* get all the relations present in the specified schema */
943 : 230 : scan = table_beginscan_catalog(classRel, 1, key);
944 [ + + ]: 13678 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
945 : : {
946 : 13448 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
947 : 13448 : Oid relid = relForm->oid;
948 : : char relkind;
949 : :
950 [ + + ]: 13448 : if (!is_publishable_class(relid, relForm))
951 : 5530 : continue;
952 : :
953 : 7918 : relkind = get_rel_relkind(relid);
738 tomas.vondra@postgre 954 [ + + ]: 7918 : if (relkind == RELKIND_RELATION)
955 : 7599 : result = lappend_oid(result, relid);
956 [ + - ]: 319 : else if (relkind == RELKIND_PARTITIONED_TABLE)
957 : : {
900 akapila@postgresql.o 958 : 319 : List *partitionrels = NIL;
959 : :
960 : : /*
961 : : * It is quite possible that some of the partitions are in a
962 : : * different schema than the parent table, so we need to get such
963 : : * partitions separately.
964 : : */
965 : 319 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
966 : : pub_partopt,
967 : : relForm->oid);
968 : 319 : result = list_concat_unique_oid(result, partitionrels);
969 : : }
970 : : }
971 : :
972 : 230 : table_endscan(scan);
973 : 230 : table_close(classRel, AccessShareLock);
974 : 230 : return result;
975 : : }
976 : :
977 : : /*
978 : : * Gets the list of all relations published by FOR TABLES IN SCHEMA
979 : : * publication.
980 : : */
981 : : List *
738 tomas.vondra@postgre 982 : 806 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
983 : : {
900 akapila@postgresql.o 984 : 806 : List *result = NIL;
738 tomas.vondra@postgre 985 : 806 : List *pubschemalist = GetPublicationSchemas(pubid);
986 : : ListCell *cell;
987 : :
900 akapila@postgresql.o 988 [ + + + + : 841 : foreach(cell, pubschemalist)
+ + ]
989 : : {
990 : 35 : Oid schemaid = lfirst_oid(cell);
991 : 35 : List *schemaRels = NIL;
992 : :
738 tomas.vondra@postgre 993 : 35 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
900 akapila@postgresql.o 994 : 35 : result = list_concat(result, schemaRels);
995 : : }
996 : :
997 : 806 : return result;
998 : : }
999 : :
1000 : : /*
1001 : : * Get publication using oid
1002 : : *
1003 : : * The Publication struct and its data are palloc'ed here.
1004 : : */
1005 : : Publication *
2642 peter_e@gmx.net 1006 : 3612 : GetPublication(Oid pubid)
1007 : : {
1008 : : HeapTuple tup;
1009 : : Publication *pub;
1010 : : Form_pg_publication pubform;
1011 : :
1012 : 3612 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1013 [ - + ]: 3612 : if (!HeapTupleIsValid(tup))
2642 peter_e@gmx.net 1014 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1015 : :
2642 peter_e@gmx.net 1016 :CBC 3612 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1017 : :
1018 : 3612 : pub = (Publication *) palloc(sizeof(Publication));
1019 : 3612 : pub->oid = pubid;
1020 : 3612 : pub->name = pstrdup(NameStr(pubform->pubname));
1021 : 3612 : pub->alltables = pubform->puballtables;
1022 : 3612 : pub->pubactions.pubinsert = pubform->pubinsert;
1023 : 3612 : pub->pubactions.pubupdate = pubform->pubupdate;
1024 : 3612 : pub->pubactions.pubdelete = pubform->pubdelete;
2199 1025 : 3612 : pub->pubactions.pubtruncate = pubform->pubtruncate;
1467 peter@eisentraut.org 1026 : 3612 : pub->pubviaroot = pubform->pubviaroot;
1027 : :
2642 peter_e@gmx.net 1028 : 3612 : ReleaseSysCache(tup);
1029 : :
1030 : 3612 : return pub;
1031 : : }
1032 : :
1033 : : /*
1034 : : * Get Publication using name.
1035 : : */
1036 : : Publication *
1037 : 1125 : GetPublicationByName(const char *pubname, bool missing_ok)
1038 : : {
1039 : : Oid oid;
1040 : :
1574 alvherre@alvh.no-ip. 1041 : 1125 : oid = get_publication_oid(pubname, missing_ok);
1042 : :
1043 [ + - ]: 1123 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1044 : : }
1045 : :
1046 : : /*
1047 : : * Get information of the tables in the given publication array.
1048 : : *
1049 : : * Returns pubid, relid, column list, row filter for each table.
1050 : : */
1051 : : Datum
2642 peter_e@gmx.net 1052 : 2732 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1053 : : {
1054 : : #define NUM_PUBLICATION_TABLES_ELEM 4
1055 : : FuncCallContext *funcctx;
382 akapila@postgresql.o 1056 : 2732 : List *table_infos = NIL;
1057 : :
1058 : : /* stuff done only on the first call of the function */
2642 peter_e@gmx.net 1059 [ + + ]: 2732 : if (SRF_IS_FIRSTCALL())
1060 : : {
1061 : : TupleDesc tupdesc;
1062 : : MemoryContext oldcontext;
1063 : : ArrayType *arr;
1064 : : Datum *elems;
1065 : : int nelems,
1066 : : i;
382 akapila@postgresql.o 1067 : 858 : bool viaroot = false;
1068 : :
1069 : : /* create a function context for cross-call persistence */
2642 peter_e@gmx.net 1070 : 858 : funcctx = SRF_FIRSTCALL_INIT();
1071 : :
1072 : : /* switch to memory context appropriate for multiple function calls */
1073 : 858 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1074 : :
1075 : : /*
1076 : : * Deconstruct the parameter into elements where each element is a
1077 : : * publication name.
1078 : : */
382 akapila@postgresql.o 1079 : 858 : arr = PG_GETARG_ARRAYTYPE_P(0);
1080 : 858 : deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
1081 : : &elems, NULL, &nelems);
1082 : :
1083 : : /* Get Oids of tables from each publication. */
1084 [ + + ]: 1759 : for (i = 0; i < nelems; i++)
1085 : : {
1086 : : Publication *pub_elem;
1087 : 901 : List *pub_elem_tables = NIL;
1088 : : ListCell *lc;
1089 : :
1090 : 901 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1091 : :
1092 : : /*
1093 : : * Publications support partitioned tables. If
1094 : : * publish_via_partition_root is false, all changes are replicated
1095 : : * using leaf partition identity and schema, so we only need
1096 : : * those. Otherwise, get the partitioned table itself.
1097 : : */
1098 [ + + ]: 901 : if (pub_elem->alltables)
1099 : 140 : pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
1100 : : else
1101 : : {
1102 : : List *relids,
1103 : : *schemarelids;
1104 : :
1105 : 761 : relids = GetPublicationRelations(pub_elem->oid,
1106 : 761 : pub_elem->pubviaroot ?
1107 : 761 : PUBLICATION_PART_ROOT :
1108 : : PUBLICATION_PART_LEAF);
1109 : 761 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1110 : 761 : pub_elem->pubviaroot ?
1111 : 761 : PUBLICATION_PART_ROOT :
1112 : : PUBLICATION_PART_LEAF);
1113 : 761 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1114 : : }
1115 : :
1116 : : /*
1117 : : * Record the published table and the corresponding publication so
1118 : : * that we can get row filters and column lists later.
1119 : : *
1120 : : * When a table is published by multiple publications, to obtain
1121 : : * all row filters and column lists, the structure related to this
1122 : : * table will be recorded multiple times.
1123 : : */
1124 [ + + + + : 2815 : foreach(lc, pub_elem_tables)
+ + ]
1125 : : {
1126 : 1914 : published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
1127 : :
1128 : 1914 : table_info->relid = lfirst_oid(lc);
1129 : 1914 : table_info->pubid = pub_elem->oid;
1130 : 1914 : table_infos = lappend(table_infos, table_info);
1131 : : }
1132 : :
1133 : : /* At least one publication is using publish_via_partition_root. */
1134 [ + + ]: 901 : if (pub_elem->pubviaroot)
1135 : 176 : viaroot = true;
1136 : : }
1137 : :
1138 : : /*
1139 : : * If the publication publishes partition changes via their respective
1140 : : * root partitioned tables, we must exclude partitions in favor of
1141 : : * including the root partitioned tables. Otherwise, the function
1142 : : * could return both the child and parent tables which could cause
1143 : : * data of the child table to be double-published on the subscriber
1144 : : * side.
1145 : : */
1146 [ + + ]: 858 : if (viaroot)
1147 : 168 : filter_partitions(table_infos);
1148 : :
1149 : : /* Construct a tuple descriptor for the result rows. */
661 michael@paquier.xyz 1150 : 858 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
382 akapila@postgresql.o 1151 : 858 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1152 : : OIDOID, -1, 0);
1153 : 858 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1154 : : OIDOID, -1, 0);
1155 : 858 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1156 : : INT2VECTOROID, -1, 0);
1157 : 858 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1158 : : PG_NODE_TREEOID, -1, 0);
1159 : :
696 1160 : 858 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
382 1161 : 858 : funcctx->user_fctx = (void *) table_infos;
1162 : :
2642 peter_e@gmx.net 1163 : 858 : MemoryContextSwitchTo(oldcontext);
1164 : : }
1165 : :
1166 : : /* stuff done on every call of the function */
1167 : 2732 : funcctx = SRF_PERCALL_SETUP();
382 akapila@postgresql.o 1168 : 2732 : table_infos = (List *) funcctx->user_fctx;
1169 : :
1170 [ + + ]: 2732 : if (funcctx->call_cntr < list_length(table_infos))
1171 : : {
696 1172 : 1874 : HeapTuple pubtuple = NULL;
1173 : : HeapTuple rettuple;
1174 : : Publication *pub;
382 1175 : 1874 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1176 : 1874 : Oid relid = table_info->relid;
569 1177 : 1874 : Oid schemaid = get_rel_namespace(relid);
638 peter@eisentraut.org 1178 : 1874 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1179 : 1874 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1180 : :
1181 : : /*
1182 : : * Form tuple with appropriate data.
1183 : : */
1184 : :
382 akapila@postgresql.o 1185 : 1874 : pub = GetPublication(table_info->pubid);
1186 : :
1187 : 1874 : values[0] = ObjectIdGetDatum(pub->oid);
1188 : 1874 : values[1] = ObjectIdGetDatum(relid);
1189 : :
1190 : : /*
1191 : : * We don't consider row filters or column lists for FOR ALL TABLES or
1192 : : * FOR TABLES IN SCHEMA publications.
1193 : : */
1194 [ + + ]: 1874 : if (!pub->alltables &&
569 1195 [ + + ]: 1206 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1196 : : ObjectIdGetDatum(schemaid),
1197 : : ObjectIdGetDatum(pub->oid)))
1198 : 1159 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1199 : : ObjectIdGetDatum(relid),
1200 : : ObjectIdGetDatum(pub->oid));
1201 : :
696 1202 [ + + ]: 1874 : if (HeapTupleIsValid(pubtuple))
1203 : : {
1204 : : /* Lookup the column list attribute. */
382 1205 : 1050 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1206 : : Anum_pg_publication_rel_prattrs,
1207 : : &(nulls[2]));
1208 : :
1209 : : /* Null indicates no filter. */
1210 : 1050 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1211 : : Anum_pg_publication_rel_prqual,
1212 : : &(nulls[3]));
1213 : : }
1214 : : else
1215 : : {
696 1216 : 824 : nulls[2] = true;
382 1217 : 824 : nulls[3] = true;
1218 : : }
1219 : :
1220 : : /* Show all columns when the column list is not specified. */
1221 [ + + ]: 1874 : if (nulls[2])
1222 : : {
457 1223 : 1757 : Relation rel = table_open(relid, AccessShareLock);
1224 : 1757 : int nattnums = 0;
1225 : : int16 *attnums;
1226 : 1757 : TupleDesc desc = RelationGetDescr(rel);
1227 : : int i;
1228 : :
1229 : 1757 : attnums = (int16 *) palloc(desc->natts * sizeof(int16));
1230 : :
1231 [ + + ]: 4704 : for (i = 0; i < desc->natts; i++)
1232 : : {
1233 : 2947 : Form_pg_attribute att = TupleDescAttr(desc, i);
1234 : :
1235 [ + + + + ]: 2947 : if (att->attisdropped || att->attgenerated)
1236 : 17 : continue;
1237 : :
1238 : 2930 : attnums[nattnums++] = att->attnum;
1239 : : }
1240 : :
1241 [ + + ]: 1757 : if (nattnums > 0)
1242 : : {
382 1243 : 1735 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1244 : 1735 : nulls[2] = false;
1245 : : }
1246 : :
457 1247 : 1757 : table_close(rel, AccessShareLock);
1248 : : }
1249 : :
696 1250 : 1874 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1251 : :
1252 : 1874 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1253 : : }
1254 : :
2642 peter_e@gmx.net 1255 : 858 : SRF_RETURN_DONE(funcctx);
1256 : : }
|