Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/tableam.h"
21 : #include "access/xact.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/pg_subscription.h"
24 : #include "catalog/pg_subscription_rel.h"
25 : #include "catalog/pg_type.h"
26 : #include "miscadmin.h"
27 : #include "nodes/makefuncs.h"
28 : #include "storage/lmgr.h"
29 : #include "utils/array.h"
30 : #include "utils/builtins.h"
31 : #include "utils/fmgroids.h"
32 : #include "utils/lsyscache.h"
33 : #include "utils/pg_lsn.h"
34 : #include "utils/rel.h"
35 : #include "utils/syscache.h"
36 :
37 : static List *textarray_to_stringlist(ArrayType *textarray);
38 :
39 : /*
40 : * Fetch the subscription from the syscache.
41 : */
42 : Subscription *
2271 peter_e 43 CBC 564 : GetSubscription(Oid subid, bool missing_ok)
44 : {
45 : HeapTuple tup;
46 : Subscription *sub;
47 : Form_pg_subscription subform;
48 : Datum datum;
49 : bool isnull;
50 :
51 564 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
52 :
53 564 : if (!HeapTupleIsValid(tup))
54 : {
2271 peter_e 55 UBC 0 : if (missing_ok)
56 0 : return NULL;
57 :
58 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
59 : }
60 :
2271 peter_e 61 CBC 564 : subform = (Form_pg_subscription) GETSTRUCT(tup);
62 :
63 564 : sub = (Subscription *) palloc(sizeof(Subscription));
64 564 : sub->oid = subid;
65 564 : sub->dbid = subform->subdbid;
367 akapila 66 564 : sub->skiplsn = subform->subskiplsn;
2271 peter_e 67 564 : sub->name = pstrdup(NameStr(subform->subname));
68 564 : sub->owner = subform->subowner;
69 564 : sub->enabled = subform->subenabled;
995 tgl 70 564 : sub->binary = subform->subbinary;
948 akapila 71 564 : sub->stream = subform->substream;
634 72 564 : sub->twophasestate = subform->subtwophasestate;
391 73 564 : sub->disableonerr = subform->subdisableonerr;
10 rhaas 74 GNC 564 : sub->passwordrequired = subform->subpasswordrequired;
5 75 564 : sub->runasowner = subform->subrunasowner;
2271 peter_e 76 ECB :
77 : /* Get conninfo */
15 dgustafsson 78 GNC 564 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
79 : tup,
80 : Anum_pg_subscription_subconninfo);
2186 peter_e 81 CBC 564 : sub->conninfo = TextDatumGetCString(datum);
82 :
83 : /* Get slotname */
2271 84 564 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
85 : tup,
86 : Anum_pg_subscription_subslotname,
87 : &isnull);
2161 88 564 : if (!isnull)
89 531 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90 : else
91 33 : sub->slotname = NULL;
92 :
93 : /* Get synccommit */
15 dgustafsson 94 GNC 564 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
95 : tup,
96 : Anum_pg_subscription_subsynccommit);
2186 peter_e 97 GIC 564 : sub->synccommit = TextDatumGetCString(datum);
2186 peter_e 98 ECB :
99 : /* Get publications */
15 dgustafsson 100 GNC 564 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
101 : tup,
102 : Anum_pg_subscription_subpublications);
2271 peter_e 103 GIC 564 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
104 :
105 : /* Get origin */
15 dgustafsson 106 GNC 564 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
107 : tup,
108 : Anum_pg_subscription_suborigin);
262 akapila 109 564 : sub->origin = TextDatumGetCString(datum);
110 :
2271 peter_e 111 CBC 564 : ReleaseSysCache(tup);
112 :
113 564 : return sub;
114 : }
2271 peter_e 115 ECB :
116 : /*
117 : * Return number of subscriptions defined in given database.
118 : * Used by dropdb() to check if database can indeed be dropped.
119 : */
120 : int
2271 peter_e 121 GIC 20 : CountDBSubscriptions(Oid dbid)
122 : {
2153 bruce 123 CBC 20 : int nsubs = 0;
124 : Relation rel;
2153 bruce 125 ECB : ScanKeyData scankey;
126 : SysScanDesc scan;
127 : HeapTuple tup;
128 :
1539 andres 129 GIC 20 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
130 :
2271 peter_e 131 CBC 20 : ScanKeyInit(&scankey,
132 : Anum_pg_subscription_subdbid,
2271 peter_e 133 ECB : BTEqualStrategyNumber, F_OIDEQ,
134 : ObjectIdGetDatum(dbid));
135 :
2271 peter_e 136 GIC 20 : scan = systable_beginscan(rel, InvalidOid, false,
137 : NULL, 1, &scankey);
2271 peter_e 138 ECB :
2271 peter_e 139 GIC 20 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
2271 peter_e 140 UIC 0 : nsubs++;
2271 peter_e 141 ECB :
2271 peter_e 142 GBC 20 : systable_endscan(scan);
143 :
1539 andres 144 CBC 20 : table_close(rel, NoLock);
145 :
2271 peter_e 146 20 : return nsubs;
147 : }
2271 peter_e 148 ECB :
149 : /*
150 : * Free memory allocated by subscription struct.
151 : */
152 : void
2271 peter_e 153 GIC 26 : FreeSubscription(Subscription *sub)
154 : {
2271 peter_e 155 CBC 26 : pfree(sub->name);
2271 peter_e 156 GIC 26 : pfree(sub->conninfo);
2161 peter_e 157 CBC 26 : if (sub->slotname)
158 26 : pfree(sub->slotname);
2271 159 26 : list_free_deep(sub->publications);
160 26 : pfree(sub);
161 26 : }
2271 peter_e 162 ECB :
391 akapila 163 : /*
164 : * Disable the given subscription.
165 : */
166 : void
391 akapila 167 GIC 4 : DisableSubscription(Oid subid)
168 : {
391 akapila 169 ECB : Relation rel;
170 : bool nulls[Natts_pg_subscription];
171 : bool replaces[Natts_pg_subscription];
172 : Datum values[Natts_pg_subscription];
173 : HeapTuple tup;
174 :
175 : /* Look up the subscription in the catalog */
391 akapila 176 GIC 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
177 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
391 akapila 178 ECB :
391 akapila 179 CBC 4 : if (!HeapTupleIsValid(tup))
391 akapila 180 UIC 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
391 akapila 181 ECB :
391 akapila 182 GBC 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
183 :
391 akapila 184 ECB : /* Form a new tuple. */
391 akapila 185 GIC 4 : memset(values, 0, sizeof(values));
186 4 : memset(nulls, false, sizeof(nulls));
391 akapila 187 CBC 4 : memset(replaces, false, sizeof(replaces));
391 akapila 188 ECB :
189 : /* Set the subscription to disabled. */
391 akapila 190 GIC 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
191 4 : replaces[Anum_pg_subscription_subenabled - 1] = true;
391 akapila 192 ECB :
193 : /* Update the catalog */
391 akapila 194 GIC 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
195 : replaces);
391 akapila 196 CBC 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
391 akapila 197 GIC 4 : heap_freetuple(tup);
391 akapila 198 ECB :
391 akapila 199 CBC 4 : table_close(rel, NoLock);
391 akapila 200 GIC 4 : }
391 akapila 201 ECB :
202 : /*
2271 peter_e 203 : * Convert text array to list of strings.
204 : *
205 : * Note: the resulting list of strings is pallocated here.
206 : */
207 : static List *
2271 peter_e 208 CBC 564 : textarray_to_stringlist(ArrayType *textarray)
2271 peter_e 209 EUB : {
210 : Datum *elems;
2153 bruce 211 ECB : int nelems,
212 : i;
2153 bruce 213 CBC 564 : List *res = NIL;
214 :
282 peter 215 GNC 564 : deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
2271 peter_e 216 ECB :
2271 peter_e 217 GIC 564 : if (nelems == 0)
2271 peter_e 218 UIC 0 : return NIL;
2271 peter_e 219 ECB :
2271 peter_e 220 CBC 1458 : for (i = 0; i < nelems; i++)
2186 peter_e 221 GIC 894 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
222 :
2271 223 564 : return res;
224 : }
225 :
2208 peter_e 226 ECB : /*
227 : * Add new state record for a subscription table.
228 : */
229 : void
1829 peter_e 230 GIC 157 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
231 : XLogRecPtr sublsn)
232 : {
233 : Relation rel;
234 : HeapTuple tup;
2208 peter_e 235 ECB : bool nulls[Natts_pg_subscription_rel];
236 : Datum values[Natts_pg_subscription_rel];
237 :
2106 peter_e 238 GIC 157 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
239 :
1539 andres 240 CBC 157 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
241 :
242 : /* Try finding existing mapping. */
2208 peter_e 243 157 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
2208 peter_e 244 EUB : ObjectIdGetDatum(relid),
245 : ObjectIdGetDatum(subid));
1829 peter_e 246 GIC 157 : if (HeapTupleIsValid(tup))
1829 peter_e 247 UIC 0 : elog(ERROR, "subscription table %u in subscription %u already exists",
1829 peter_e 248 ECB : relid, subid);
249 :
250 : /* Form the tuple. */
1829 peter_e 251 GIC 157 : memset(values, 0, sizeof(values));
1829 peter_e 252 CBC 157 : memset(nulls, false, sizeof(nulls));
253 157 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
1829 peter_e 254 GIC 157 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
1829 peter_e 255 CBC 157 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
256 157 : if (sublsn != InvalidXLogRecPtr)
1829 peter_e 257 LBC 0 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
258 : else
1829 peter_e 259 CBC 157 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
260 :
261 157 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
262 :
263 : /* Insert tuple into catalog. */
1601 andres 264 GIC 157 : CatalogTupleInsert(rel, tup);
2208 peter_e 265 ECB :
1829 peter_e 266 GIC 157 : heap_freetuple(tup);
267 :
1829 peter_e 268 ECB : /* Cleanup. */
1539 andres 269 CBC 157 : table_close(rel, NoLock);
1829 peter_e 270 GIC 157 : }
271 :
272 : /*
273 : * Update the state of a subscription table.
274 : */
275 : void
276 596 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
1829 peter_e 277 ECB : XLogRecPtr sublsn)
278 : {
279 : Relation rel;
280 : HeapTuple tup;
281 : bool nulls[Natts_pg_subscription_rel];
282 : Datum values[Natts_pg_subscription_rel];
283 : bool replaces[Natts_pg_subscription_rel];
284 :
1829 peter_e 285 GIC 596 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
286 :
1539 andres 287 596 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
288 :
1829 peter_e 289 ECB : /* Try finding existing mapping. */
1829 peter_e 290 GIC 596 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
291 : ObjectIdGetDatum(relid),
1829 peter_e 292 ECB : ObjectIdGetDatum(subid));
1829 peter_e 293 GIC 596 : if (!HeapTupleIsValid(tup))
1829 peter_e 294 UIC 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
295 : relid, subid);
1829 peter_e 296 ECB :
297 : /* Update the tuple. */
1829 peter_e 298 CBC 596 : memset(values, 0, sizeof(values));
299 596 : memset(nulls, false, sizeof(nulls));
300 596 : memset(replaces, false, sizeof(replaces));
301 :
1829 peter_e 302 GIC 596 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
303 596 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
1829 peter_e 304 ECB :
1829 peter_e 305 GIC 596 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
306 596 : if (sublsn != InvalidXLogRecPtr)
1829 peter_e 307 CBC 293 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
308 : else
309 303 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
1829 peter_e 310 ECB :
1829 peter_e 311 GIC 596 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1829 peter_e 312 ECB : replaces);
313 :
314 : /* Update the catalog. */
1829 peter_e 315 CBC 596 : CatalogTupleUpdate(rel, &tup->t_self, tup);
316 :
2208 peter_e 317 ECB : /* Cleanup. */
1539 andres 318 GIC 596 : table_close(rel, NoLock);
2208 peter_e 319 CBC 596 : }
320 :
321 : /*
322 : * Get state of subscription table.
323 : *
324 : * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
325 : */
326 : char
906 alvherre 327 948 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
328 : {
329 : HeapTuple tup;
330 : char substate;
331 : bool isnull;
332 : Datum d;
786 akapila 333 ECB : Relation rel;
334 :
335 : /*
336 : * This is to avoid the race condition with AlterSubscription which tries
337 : * to remove this relstate.
338 : */
786 akapila 339 CBC 948 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
340 :
341 : /* Try finding the mapping. */
2208 peter_e 342 GIC 948 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
343 : ObjectIdGetDatum(relid),
344 : ObjectIdGetDatum(subid));
345 :
2208 peter_e 346 CBC 948 : if (!HeapTupleIsValid(tup))
347 : {
773 akapila 348 24 : table_close(rel, AccessShareLock);
906 alvherre 349 GIC 24 : *sublsn = InvalidXLogRecPtr;
350 24 : return SUBREL_STATE_UNKNOWN;
351 : }
352 :
353 : /* Get the state. */
354 924 : substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
355 :
906 alvherre 356 ECB : /* Get the LSN */
2208 peter_e 357 CBC 924 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
358 : Anum_pg_subscription_rel_srsublsn, &isnull);
2208 peter_e 359 GIC 924 : if (isnull)
360 480 : *sublsn = InvalidXLogRecPtr;
2208 peter_e 361 ECB : else
2208 peter_e 362 GIC 444 : *sublsn = DatumGetLSN(d);
363 :
364 : /* Cleanup */
365 924 : ReleaseSysCache(tup);
366 :
786 akapila 367 924 : table_close(rel, AccessShareLock);
368 :
2208 peter_e 369 924 : return substate;
2208 peter_e 370 ECB : }
371 :
2208 peter_e 372 EUB : /*
373 : * Drop subscription relation mapping. These can be for a particular
374 : * subscription, or for a particular relation, or both.
375 : */
376 : void
2208 peter_e 377 GIC 19265 : RemoveSubscriptionRel(Oid subid, Oid relid)
378 : {
379 : Relation rel;
380 : TableScanDesc scan;
381 : ScanKeyData skey[2];
382 : HeapTuple tup;
383 19265 : int nkeys = 0;
384 :
1539 andres 385 19265 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
386 :
2208 peter_e 387 19265 : if (OidIsValid(subid))
2208 peter_e 388 ECB : {
2208 peter_e 389 GIC 99 : ScanKeyInit(&skey[nkeys++],
2208 peter_e 390 ECB : Anum_pg_subscription_rel_srsubid,
391 : BTEqualStrategyNumber,
392 : F_OIDEQ,
393 : ObjectIdGetDatum(subid));
394 : }
395 :
2208 peter_e 396 GIC 19265 : if (OidIsValid(relid))
397 : {
398 19196 : ScanKeyInit(&skey[nkeys++],
399 : Anum_pg_subscription_rel_srrelid,
400 : BTEqualStrategyNumber,
401 : F_OIDEQ,
2208 peter_e 402 ECB : ObjectIdGetDatum(relid));
403 : }
404 :
405 : /* Do the search and delete what we found. */
1490 andres 406 GIC 19265 : scan = table_beginscan_catalog(rel, nkeys, skey);
2208 peter_e 407 19345 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
408 : {
786 akapila 409 ECB : Form_pg_subscription_rel subrel;
410 :
786 akapila 411 CBC 80 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
412 :
413 : /*
414 : * We don't allow to drop the relation mapping when the table
415 : * synchronization is in progress unless the caller updates the
786 akapila 416 ECB : * corresponding subscription as well. This is to ensure that we don't
417 : * leave tablesync slots or origins in the system when the
418 : * corresponding table is dropped.
419 : */
786 akapila 420 CBC 80 : if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
421 : {
786 akapila 422 UIC 0 : ereport(ERROR,
786 akapila 423 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
424 : errmsg("could not drop relation mapping for subscription \"%s\"",
425 : get_subscription_name(subrel->srsubid, false)),
426 : errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
427 : get_rel_name(relid), subrel->srsubstate),
428 :
429 : /*
430 : * translator: first %s is a SQL ALTER command and second %s is a
431 : * SQL DROP command
432 : */
433 : errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
434 : "ALTER SUBSCRIPTION ... ENABLE",
435 : "DROP SUBSCRIPTION ...")));
436 : }
437 :
2125 tgl 438 GIC 80 : CatalogTupleDelete(rel, &tup->t_self);
2208 peter_e 439 ECB : }
1490 andres 440 GIC 19265 : table_endscan(scan);
441 :
1539 andres 442 CBC 19265 : table_close(rel, RowExclusiveLock);
2208 peter_e 443 GIC 19265 : }
444 :
445 : /*
634 akapila 446 ECB : * Does the subscription have any relations?
447 : *
448 : * Use this function only to know true/false, and when you have no need for the
449 : * List returned by GetSubscriptionRelations.
450 : */
451 : bool
634 akapila 452 GIC 167 : HasSubscriptionRelations(Oid subid)
634 akapila 453 ECB : {
454 : Relation rel;
455 : ScanKeyData skey[1];
456 : SysScanDesc scan;
457 : bool has_subrels;
458 :
634 akapila 459 CBC 167 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
460 :
634 akapila 461 GIC 167 : ScanKeyInit(&skey[0],
634 akapila 462 ECB : Anum_pg_subscription_rel_srsubid,
463 : BTEqualStrategyNumber, F_OIDEQ,
464 : ObjectIdGetDatum(subid));
465 :
634 akapila 466 GIC 167 : scan = systable_beginscan(rel, InvalidOid, false,
467 : NULL, 1, skey);
468 :
634 akapila 469 ECB : /* If even a single tuple exists then the subscription has tables. */
634 akapila 470 GIC 167 : has_subrels = HeapTupleIsValid(systable_getnext(scan));
634 akapila 471 ECB :
472 : /* Cleanup */
634 akapila 473 CBC 167 : systable_endscan(scan);
474 167 : table_close(rel, AccessShareLock);
475 :
476 167 : return has_subrels;
634 akapila 477 ECB : }
478 :
2208 peter_e 479 : /*
480 : * Get the relations for the subscription.
481 : *
482 : * If not_ready is true, return only the relations that are not in a ready
483 : * state, otherwise return all the relations of the subscription. The
484 : * returned list is palloc'ed in the current memory context.
485 : */
486 : List *
256 michael 487 GNC 771 : GetSubscriptionRelations(Oid subid, bool not_ready)
488 : {
2208 peter_e 489 GIC 771 : List *res = NIL;
490 : Relation rel;
491 : HeapTuple tup;
492 771 : int nkeys = 0;
493 : ScanKeyData skey[2];
494 : SysScanDesc scan;
495 :
1539 andres 496 771 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
497 :
2208 peter_e 498 771 : ScanKeyInit(&skey[nkeys++],
499 : Anum_pg_subscription_rel_srsubid,
500 : BTEqualStrategyNumber, F_OIDEQ,
501 : ObjectIdGetDatum(subid));
502 :
256 michael 503 GNC 771 : if (not_ready)
504 733 : ScanKeyInit(&skey[nkeys++],
505 : Anum_pg_subscription_rel_srsubstate,
506 : BTEqualStrategyNumber, F_CHARNE,
507 : CharGetDatum(SUBREL_STATE_READY));
508 :
2208 peter_e 509 GIC 771 : scan = systable_beginscan(rel, InvalidOid, false,
510 : NULL, nkeys, skey);
511 :
512 2056 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
513 : {
514 : Form_pg_subscription_rel subrel;
515 : SubscriptionRelState *relstate;
516 : Datum d;
517 : bool isnull;
518 :
519 1285 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
520 :
2153 bruce 521 1285 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
2208 peter_e 522 1285 : relstate->relid = subrel->srrelid;
523 1285 : relstate->state = subrel->srsubstate;
993 tgl 524 1285 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
525 : Anum_pg_subscription_rel_srsublsn, &isnull);
526 1285 : if (isnull)
527 1044 : relstate->lsn = InvalidXLogRecPtr;
528 : else
529 241 : relstate->lsn = DatumGetLSN(d);
530 :
2208 peter_e 531 1285 : res = lappend(res, relstate);
532 : }
533 :
534 : /* Cleanup */
535 771 : systable_endscan(scan);
1539 andres 536 771 : table_close(rel, AccessShareLock);
537 :
2208 peter_e 538 771 : return res;
539 : }
|