Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * relation.c
3 : * PostgreSQL logical replication relation mapping cache
4 : *
5 : * Copyright (c) 2016-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/relation.c
9 : *
10 : * NOTES
11 : * Routines in this file mainly have to do with mapping the properties
12 : * of local replication target relations to the properties of their
13 : * remote counterpart.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/genam.h"
21 : #include "access/table.h"
22 : #include "catalog/namespace.h"
23 : #include "catalog/pg_am_d.h"
24 : #include "catalog/pg_subscription_rel.h"
25 : #include "executor/executor.h"
26 : #include "nodes/makefuncs.h"
27 : #include "replication/logicalrelation.h"
28 : #include "replication/worker_internal.h"
29 : #include "utils/inval.h"
30 :
31 :
32 : static MemoryContext LogicalRepRelMapContext = NULL;
33 :
34 : static HTAB *LogicalRepRelMap = NULL;
35 :
36 : /*
37 : * Partition map (LogicalRepPartMap)
38 : *
39 : * When a partitioned table is used as replication target, replicated
40 : * operations are actually performed on its leaf partitions, which requires
41 : * the partitions to also be mapped to the remote relation. Parent's entry
42 : * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
43 : * individual partitions may have different attribute numbers, which means
44 : * attribute mappings to remote relation's attributes must be maintained
45 : * separately for each partition.
46 : */
47 : static MemoryContext LogicalRepPartMapContext = NULL;
48 : static HTAB *LogicalRepPartMap = NULL;
49 : typedef struct LogicalRepPartMapEntry
50 : {
51 : Oid partoid; /* LogicalRepPartMap's key */
52 : LogicalRepRelMapEntry relmapentry;
53 : } LogicalRepPartMapEntry;
54 :
55 : static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
56 : AttrMap *attrMap);
57 :
58 : /*
59 : * Relcache invalidation callback for our relation map cache.
60 : */
61 : static void
2271 peter_e 62 GIC 670 : logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
63 : {
64 : LogicalRepRelMapEntry *entry;
65 :
66 : /* Just to be sure. */
2271 peter_e 67 CBC 670 : if (LogicalRepRelMap == NULL)
2271 peter_e 68 UIC 0 : return;
69 :
2271 peter_e 70 GIC 670 : if (reloid != InvalidOid)
71 : {
2271 peter_e 72 ECB : HASH_SEQ_STATUS status;
2271 peter_e 73 EUB :
2271 peter_e 74 GIC 670 : hash_seq_init(&status, LogicalRepRelMap);
2271 peter_e 75 ECB :
76 : /* TODO, use inverse lookup hashtable? */
2271 peter_e 77 GIC 2892 : while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
78 : {
2271 peter_e 79 CBC 2335 : if (entry->localreloid == reloid)
80 : {
935 tgl 81 GIC 113 : entry->localrelvalid = false;
2271 peter_e 82 CBC 113 : hash_seq_term(&status);
2271 peter_e 83 GIC 113 : break;
2271 peter_e 84 ECB : }
85 : }
86 : }
87 : else
88 : {
89 : /* invalidate all cache entries */
90 : HASH_SEQ_STATUS status;
91 :
2271 peter_e 92 UIC 0 : hash_seq_init(&status, LogicalRepRelMap);
93 :
94 0 : while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
935 tgl 95 0 : entry->localrelvalid = false;
96 : }
2271 peter_e 97 EUB : }
98 :
99 : /*
100 : * Initialize the relation map cache.
101 : */
102 : static void
2177 andres 103 GIC 276 : logicalrep_relmap_init(void)
104 : {
105 : HASHCTL ctl;
106 :
2271 peter_e 107 276 : if (!LogicalRepRelMapContext)
2271 peter_e 108 CBC 276 : LogicalRepRelMapContext =
2271 peter_e 109 GIC 276 : AllocSetContextCreate(CacheMemoryContext,
110 : "LogicalRepRelMapContext",
111 : ALLOCSET_DEFAULT_SIZES);
2271 peter_e 112 ECB :
113 : /* Initialize the relation hash table. */
2271 peter_e 114 CBC 276 : ctl.keysize = sizeof(LogicalRepRelId);
2271 peter_e 115 GIC 276 : ctl.entrysize = sizeof(LogicalRepRelMapEntry);
116 276 : ctl.hcxt = LogicalRepRelMapContext;
117 :
118 276 : LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
2271 peter_e 119 ECB : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
120 :
121 : /* Watch for invalidation events. */
2271 peter_e 122 GIC 276 : CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
2271 peter_e 123 ECB : (Datum) 0);
2271 peter_e 124 GIC 276 : }
125 :
126 : /*
2271 peter_e 127 ECB : * Free the entry of a relation map cache.
128 : */
129 : static void
2271 peter_e 130 GIC 138 : logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
131 : {
132 : LogicalRepRelation *remoterel;
133 :
134 138 : remoterel = &entry->remoterel;
2271 peter_e 135 ECB :
2271 peter_e 136 GIC 138 : pfree(remoterel->nspname);
137 138 : pfree(remoterel->relname);
138 :
2271 peter_e 139 CBC 138 : if (remoterel->natts > 0)
140 : {
2153 bruce 141 ECB : int i;
2271 peter_e 142 :
2271 peter_e 143 GIC 421 : for (i = 0; i < remoterel->natts; i++)
2271 peter_e 144 CBC 283 : pfree(remoterel->attnames[i]);
145 :
2271 peter_e 146 GIC 138 : pfree(remoterel->attnames);
147 138 : pfree(remoterel->atttyps);
2271 peter_e 148 ECB : }
2271 peter_e 149 CBC 138 : bms_free(remoterel->attkeys);
150 :
151 138 : if (entry->attrmap)
290 akapila 152 116 : free_attrmap(entry->attrmap);
2271 peter_e 153 GIC 138 : }
2271 peter_e 154 ECB :
155 : /*
156 : * Add new entry or update existing entry in the relation map cache.
157 : *
158 : * Called when new relation mapping is sent by the publisher to update
159 : * our expected view of incoming data from said publisher.
160 : */
161 : void
2271 peter_e 162 GIC 509 : logicalrep_relmap_update(LogicalRepRelation *remoterel)
163 : {
164 : MemoryContext oldctx;
165 : LogicalRepRelMapEntry *entry;
166 : bool found;
2153 bruce 167 ECB : int i;
168 :
2271 peter_e 169 GIC 509 : if (LogicalRepRelMap == NULL)
170 276 : logicalrep_relmap_init();
171 :
172 : /*
173 : * HASH_ENTER returns the existing entry if present or creates a new one.
2271 peter_e 174 ECB : */
62 peter 175 GNC 509 : entry = hash_search(LogicalRepRelMap, &remoterel->remoteid,
176 : HASH_ENTER, &found);
177 :
2271 peter_e 178 GIC 509 : if (found)
179 130 : logicalrep_relmap_free_entry(entry);
2271 peter_e 180 ECB :
2177 andres 181 GIC 509 : memset(entry, 0, sizeof(LogicalRepRelMapEntry));
182 :
2271 peter_e 183 ECB : /* Make cached copy of the data */
2271 peter_e 184 CBC 509 : oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
2271 peter_e 185 GIC 509 : entry->remoterel.remoteid = remoterel->remoteid;
2271 peter_e 186 CBC 509 : entry->remoterel.nspname = pstrdup(remoterel->nspname);
2271 peter_e 187 GIC 509 : entry->remoterel.relname = pstrdup(remoterel->relname);
188 509 : entry->remoterel.natts = remoterel->natts;
2271 peter_e 189 CBC 509 : entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
190 509 : entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
191 1415 : for (i = 0; i < remoterel->natts; i++)
2271 peter_e 192 ECB : {
2271 peter_e 193 CBC 906 : entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
194 906 : entry->remoterel.atttyps[i] = remoterel->atttyps[i];
2271 peter_e 195 ECB : }
2271 peter_e 196 CBC 509 : entry->remoterel.replident = remoterel->replident;
2271 peter_e 197 GIC 509 : entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
2271 peter_e 198 CBC 509 : MemoryContextSwitchTo(oldctx);
199 509 : }
200 :
2271 peter_e 201 ECB : /*
202 : * Find attribute index in TupleDesc struct by attribute name.
203 : *
204 : * Returns -1 if not found.
205 : */
206 : static int
2271 peter_e 207 GIC 1055 : logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
208 : {
209 : int i;
210 :
211 2041 : for (i = 0; i < remoterel->natts; i++)
2271 peter_e 212 ECB : {
2271 peter_e 213 GIC 1781 : if (strcmp(remoterel->attnames[i], attname) == 0)
214 795 : return i;
215 : }
2271 peter_e 216 ECB :
2271 peter_e 217 GIC 260 : return -1;
2271 peter_e 218 ECB : }
219 :
220 : /*
221 : * Report error with names of the missing local relation column(s), if any.
914 akapila 222 : */
223 : static void
914 akapila 224 GIC 447 : logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
225 : Bitmapset *missingatts)
226 : {
227 447 : if (!bms_is_empty(missingatts))
228 : {
914 akapila 229 ECB : StringInfoData missingattsbuf;
914 akapila 230 UIC 0 : int missingattcnt = 0;
231 : int i;
914 akapila 232 ECB :
914 akapila 233 UIC 0 : initStringInfo(&missingattsbuf);
234 :
38 tgl 235 UNC 0 : i = -1;
236 0 : while ((i = bms_next_member(missingatts, i)) >= 0)
237 : {
914 akapila 238 UIC 0 : missingattcnt++;
914 akapila 239 UBC 0 : if (missingattcnt == 1)
914 akapila 240 UIC 0 : appendStringInfo(&missingattsbuf, _("\"%s\""),
914 akapila 241 UBC 0 : remoterel->attnames[i]);
914 akapila 242 EUB : else
914 akapila 243 UIC 0 : appendStringInfo(&missingattsbuf, _(", \"%s\""),
914 akapila 244 UBC 0 : remoterel->attnames[i]);
914 akapila 245 EUB : }
246 :
914 akapila 247 UBC 0 : ereport(ERROR,
248 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
914 akapila 249 EUB : errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s",
250 : "logical replication target relation \"%s.%s\" is missing replicated columns: %s",
251 : missingattcnt,
252 : remoterel->nspname,
253 : remoterel->relname,
254 : missingattsbuf.data)));
255 : }
914 akapila 256 GIC 447 : }
257 :
258 : /*
259 : * Check if replica identity matches and mark the updatable flag.
260 : *
261 : * We allow for stricter replica identity (fewer columns) on subscriber as
292 akapila 262 ECB : * that will not stop us from finding unique tuple. IE, if publisher has
263 : * identity (id,timestamp) and subscriber just (id) this will not be a
264 : * problem, but in the opposite scenario it will.
265 : *
266 : * We just mark the relation entry as not updatable here if the local
267 : * replica identity is found to be insufficient for applying
268 : * updates/deletes (inserts don't care!) and leave it to
269 : * check_relation_updatable() to throw the actual error if needed.
270 : */
271 : static void
292 akapila 272 GIC 461 : logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
273 : {
274 : Bitmapset *idkey;
275 461 : LogicalRepRelation *remoterel = &entry->remoterel;
276 : int i;
277 :
292 akapila 278 CBC 461 : entry->updatable = true;
279 :
292 akapila 280 GIC 461 : idkey = RelationGetIndexAttrBitmap(entry->localrel,
292 akapila 281 ECB : INDEX_ATTR_BITMAP_IDENTITY_KEY);
282 : /* fallback to PK if no replica identity */
292 akapila 283 GIC 461 : if (idkey == NULL)
292 akapila 284 ECB : {
292 akapila 285 GIC 146 : idkey = RelationGetIndexAttrBitmap(entry->localrel,
292 akapila 286 ECB : INDEX_ATTR_BITMAP_PRIMARY_KEY);
287 :
288 : /*
289 : * If no replica identity index and no PK, the published table must
290 : * have replica identity FULL.
291 : */
292 akapila 292 GIC 146 : if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
293 93 : entry->updatable = false;
294 : }
295 :
296 461 : i = -1;
297 759 : while ((i = bms_next_member(idkey, i)) >= 0)
292 akapila 298 ECB : {
292 akapila 299 CBC 320 : int attnum = i + FirstLowInvalidHeapAttributeNumber;
300 :
292 akapila 301 GIC 320 : if (!AttrNumberIsForUserDefinedAttr(attnum))
292 akapila 302 LBC 0 : ereport(ERROR,
292 akapila 303 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
304 : errmsg("logical replication target relation \"%s.%s\" uses "
305 : "system columns in REPLICA IDENTITY index",
306 : remoterel->nspname, remoterel->relname)));
307 :
292 akapila 308 GBC 320 : attnum = AttrNumberGetAttrOffset(attnum);
309 :
292 akapila 310 GIC 320 : if (entry->attrmap->attnums[attnum] < 0 ||
311 319 : !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
312 : {
313 22 : entry->updatable = false;
292 akapila 314 CBC 22 : break;
315 : }
292 akapila 316 ECB : }
292 akapila 317 CBC 461 : }
318 :
2271 peter_e 319 ECB : /*
320 : * Open the local relation associated with the remote one.
321 : *
322 : * Rebuilds the Relcache mapping if it was invalidated by local DDL.
323 : */
324 : LogicalRepRelMapEntry *
2271 peter_e 325 GIC 148039 : logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
326 : {
327 : LogicalRepRelMapEntry *entry;
328 : bool found;
329 : LogicalRepRelation *remoterel;
330 :
2271 peter_e 331 CBC 148039 : if (LogicalRepRelMap == NULL)
2271 peter_e 332 UIC 0 : logicalrep_relmap_init();
333 :
334 : /* Search for existing entry. */
62 peter 335 GNC 148039 : entry = hash_search(LogicalRepRelMap, &remoteid,
336 : HASH_FIND, &found);
2271 peter_e 337 ECB :
2271 peter_e 338 GBC 148039 : if (!found)
2271 peter_e 339 UIC 0 : elog(ERROR, "no relation map entry for remote relation ID %u",
340 : remoteid);
2271 peter_e 341 ECB :
1210 akapila 342 GIC 148039 : remoterel = &entry->remoterel;
343 :
935 tgl 344 ECB : /* Ensure we don't leak a relcache refcount. */
935 tgl 345 GBC 148039 : if (entry->localrel)
935 tgl 346 UIC 0 : elog(ERROR, "remote relation ID %u is already open", remoteid);
347 :
1210 akapila 348 ECB : /*
349 : * When opening and locking a relation, pending invalidation messages are
350 : * processed which can invalidate the relation. Hence, if the entry is
935 tgl 351 : * currently considered valid, try to open the local relation by OID and
935 tgl 352 EUB : * see if invalidation ensues.
353 : */
935 tgl 354 GIC 148039 : if (entry->localrelvalid)
355 : {
356 147585 : entry->localrel = try_table_open(entry->localreloid, lockmode);
357 147585 : if (!entry->localrel)
358 : {
359 : /* Table was renamed or dropped. */
935 tgl 360 LBC 0 : entry->localrelvalid = false;
361 : }
935 tgl 362 CBC 147585 : else if (!entry->localrelvalid)
935 tgl 363 ECB : {
364 : /* Note we release the no-longer-useful lock here. */
935 tgl 365 UIC 0 : table_close(entry->localrel, lockmode);
935 tgl 366 UBC 0 : entry->localrel = NULL;
367 : }
935 tgl 368 ECB : }
369 :
370 : /*
935 tgl 371 EUB : * If the entry has been marked invalid since we last had lock on it,
372 : * re-open the local relation by name and rebuild all derived data.
373 : */
935 tgl 374 GIC 148039 : if (!entry->localrelvalid)
375 : {
376 : Oid relid;
377 : TupleDesc desc;
378 : MemoryContext oldctx;
379 : int i;
914 akapila 380 ECB : Bitmapset *missingatts;
381 :
382 : /* Release the no-longer-useful attrmap, if any. */
290 akapila 383 GIC 454 : if (entry->attrmap)
384 : {
385 10 : free_attrmap(entry->attrmap);
386 10 : entry->attrmap = NULL;
387 : }
388 :
2271 peter_e 389 ECB : /* Try to find and lock the relation by name. */
2271 peter_e 390 GIC 454 : relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
2271 peter_e 391 ECB : remoterel->relname, -1),
392 : lockmode, true);
2271 peter_e 393 GIC 454 : if (!OidIsValid(relid))
394 7 : ereport(ERROR,
395 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2271 peter_e 396 ECB : errmsg("logical replication target relation \"%s.%s\" does not exist",
397 : remoterel->nspname, remoterel->relname)));
1539 andres 398 GIC 447 : entry->localrel = table_open(relid, NoLock);
935 tgl 399 CBC 447 : entry->localreloid = relid;
1210 akapila 400 ECB :
401 : /* Check for supported relkind. */
2154 peter_e 402 GIC 447 : CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
403 447 : remoterel->nspname, remoterel->relname);
2271 peter_e 404 ECB :
405 : /*
406 : * Build the mapping of local attribute numbers to remote attribute
407 : * numbers and validate that we don't miss any replicated columns as
2153 bruce 408 : * that would result in potentially unwanted data loss.
2271 peter_e 409 : */
2271 peter_e 410 GIC 447 : desc = RelationGetDescr(entry->localrel);
411 447 : oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
1208 michael 412 447 : entry->attrmap = make_attrmap(desc->natts);
2271 peter_e 413 447 : MemoryContextSwitchTo(oldctx);
414 :
415 : /* check and report missing attrs, if any */
914 akapila 416 CBC 447 : missingatts = bms_add_range(NULL, 0, remoterel->natts - 1);
2271 peter_e 417 1507 : for (i = 0; i < desc->natts; i++)
2271 peter_e 418 ECB : {
2152 419 : int attnum;
2058 andres 420 GIC 1060 : Form_pg_attribute attr = TupleDescAttr(desc, i);
421 :
1471 peter 422 CBC 1060 : if (attr->attisdropped || attr->attgenerated)
2071 peter_e 423 ECB : {
1208 michael 424 GIC 5 : entry->attrmap->attnums[i] = -1;
2152 peter_e 425 5 : continue;
2071 peter_e 426 ECB : }
427 :
2152 peter_e 428 CBC 1055 : attnum = logicalrep_rel_att_by_name(remoterel,
2058 andres 429 GIC 1055 : NameStr(attr->attname));
2153 bruce 430 ECB :
1208 michael 431 CBC 1055 : entry->attrmap->attnums[i] = attnum;
2271 peter_e 432 GIC 1055 : if (attnum >= 0)
914 akapila 433 795 : missingatts = bms_del_member(missingatts, attnum);
2271 peter_e 434 ECB : }
435 :
914 akapila 436 GIC 447 : logicalrep_report_missing_attrs(remoterel, missingatts);
914 akapila 437 ECB :
438 : /* be tidy */
914 akapila 439 CBC 447 : bms_free(missingatts);
440 :
441 : /*
292 akapila 442 ECB : * Set if the table's replica identity is enough to apply
443 : * update/delete.
444 : */
292 akapila 445 CBC 447 : logicalrep_rel_mark_updatable(entry);
446 :
447 : /*
448 : * Finding a usable index is an infrequent task. It occurs when an
449 : * operation is first performed on the relation, or after invalidation
450 : * of the relation cache entry (such as ANALYZE or CREATE/DROP index
451 : * on the relation).
452 : */
25 akapila 453 GNC 447 : entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel,
454 : entry->attrmap);
455 :
935 tgl 456 GIC 447 : entry->localrelvalid = true;
457 : }
458 :
2208 peter_e 459 148032 : if (entry->state != SUBREL_STATE_READY)
2208 peter_e 460 CBC 470 : entry->state = GetSubscriptionRelState(MySubscription->oid,
461 : entry->localreloid,
462 : &entry->statelsn);
463 :
2271 peter_e 464 GIC 148032 : return entry;
465 : }
466 :
467 : /*
2271 peter_e 468 ECB : * Close the previously opened logical relation.
469 : */
470 : void
2271 peter_e 471 CBC 148004 : logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
472 : {
1539 andres 473 GIC 148004 : table_close(rel->localrel, lockmode);
2271 peter_e 474 CBC 148004 : rel->localrel = NULL;
475 148004 : }
476 :
477 : /*
478 : * Partition cache: look up partition LogicalRepRelMapEntry's
1098 peter 479 ECB : *
480 : * Unlike relation map cache, this is keyed by partition OID, not remote
481 : * relation OID, because we only have to use this cache in the case where
482 : * partitions are not directly mapped to any remote relation, such as when
483 : * replication is occurring with one of their ancestors as target.
484 : */
485 :
486 : /*
487 : * Relcache invalidation callback
488 : */
489 : static void
1098 peter 490 CBC 241 : logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
491 : {
492 : LogicalRepPartMapEntry *entry;
493 :
494 : /* Just to be sure. */
1098 peter 495 GIC 241 : if (LogicalRepPartMap == NULL)
1098 peter 496 UIC 0 : return;
497 :
1098 peter 498 GIC 241 : if (reloid != InvalidOid)
499 : {
500 : HASH_SEQ_STATUS status;
501 :
502 241 : hash_seq_init(&status, LogicalRepPartMap);
503 :
504 : /* TODO, use inverse lookup hashtable? */
298 akapila 505 CBC 675 : while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
506 : {
298 akapila 507 GIC 440 : if (entry->relmapentry.localreloid == reloid)
508 : {
509 6 : entry->relmapentry.localrelvalid = false;
1098 peter 510 CBC 6 : hash_seq_term(&status);
1098 peter 511 GBC 6 : break;
512 : }
1098 peter 513 ECB : }
514 : }
515 : else
516 : {
517 : /* invalidate all cache entries */
518 : HASH_SEQ_STATUS status;
519 :
1098 peter 520 LBC 0 : hash_seq_init(&status, LogicalRepPartMap);
521 :
298 akapila 522 0 : while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
298 akapila 523 UIC 0 : entry->relmapentry.localrelvalid = false;
1098 peter 524 ECB : }
525 : }
526 :
527 : /*
528 : * Reset the entries in the partition map that refer to remoterel.
529 : *
530 : * Called when new relation mapping is sent by the publisher to update our
531 : * expected view of incoming data from said publisher.
532 : *
533 : * Note that we don't update the remoterel information in the entry here,
534 : * we will update the information in logicalrep_partition_open to avoid
297 akapila 535 EUB : * unnecessary work.
536 : */
537 : void
297 akapila 538 GBC 354 : logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
539 : {
540 : HASH_SEQ_STATUS status;
541 : LogicalRepPartMapEntry *part_entry;
542 : LogicalRepRelMapEntry *entry;
543 :
297 akapila 544 GIC 354 : if (LogicalRepPartMap == NULL)
545 320 : return;
546 :
547 34 : hash_seq_init(&status, LogicalRepPartMap);
548 87 : while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
549 : {
550 53 : entry = &part_entry->relmapentry;
551 :
552 53 : if (entry->remoterel.remoteid != remoterel->remoteid)
297 akapila 553 CBC 45 : continue;
554 :
297 akapila 555 GIC 8 : logicalrep_relmap_free_entry(entry);
556 :
557 8 : memset(entry, 0, sizeof(LogicalRepRelMapEntry));
558 : }
297 akapila 559 ECB : }
560 :
561 : /*
1098 peter 562 : * Initialize the partition map cache.
563 : */
564 : static void
1098 peter 565 CBC 5 : logicalrep_partmap_init(void)
566 : {
1098 peter 567 ECB : HASHCTL ctl;
568 :
1098 peter 569 GIC 5 : if (!LogicalRepPartMapContext)
1098 peter 570 CBC 5 : LogicalRepPartMapContext =
1098 peter 571 GIC 5 : AllocSetContextCreate(CacheMemoryContext,
1098 peter 572 ECB : "LogicalRepPartMapContext",
573 : ALLOCSET_DEFAULT_SIZES);
574 :
575 : /* Initialize the relation hash table. */
1098 peter 576 GIC 5 : ctl.keysize = sizeof(Oid); /* partition OID */
577 5 : ctl.entrysize = sizeof(LogicalRepPartMapEntry);
578 5 : ctl.hcxt = LogicalRepPartMapContext;
579 :
1098 peter 580 CBC 5 : LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
581 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
582 :
583 : /* Watch for invalidation events. */
584 5 : CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
1098 peter 585 ECB : (Datum) 0);
1098 peter 586 CBC 5 : }
587 :
588 : /*
589 : * logicalrep_partition_open
590 : *
1098 peter 591 ECB : * Returned entry reuses most of the values of the root table's entry, save
667 tgl 592 : * the attribute map, which can be different for the partition. However,
593 : * we must physically copy all the data, in case the root table's entry
594 : * gets freed/rebuilt.
1098 peter 595 : *
596 : * Note there's no logicalrep_partition_close, because the caller closes the
597 : * component relation.
598 : */
599 : LogicalRepRelMapEntry *
1098 peter 600 GIC 29 : logicalrep_partition_open(LogicalRepRelMapEntry *root,
1098 peter 601 ECB : Relation partrel, AttrMap *map)
602 : {
603 : LogicalRepRelMapEntry *entry;
604 : LogicalRepPartMapEntry *part_entry;
1098 peter 605 GIC 29 : LogicalRepRelation *remoterel = &root->remoterel;
606 29 : Oid partOid = RelationGetRelid(partrel);
607 29 : AttrMap *attrmap = root->attrmap;
608 : bool found;
609 : MemoryContext oldctx;
610 :
611 29 : if (LogicalRepPartMap == NULL)
612 5 : logicalrep_partmap_init();
613 :
614 : /* Search for existing entry. */
1098 peter 615 CBC 29 : part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
616 : &partOid,
617 : HASH_ENTER, &found);
618 :
298 akapila 619 GIC 29 : entry = &part_entry->relmapentry;
1098 peter 620 ECB :
292 akapila 621 : /*
622 : * We must always overwrite entry->localrel with the latest partition
623 : * Relation pointer, because the Relation pointed to by the old value may
624 : * have been cleared after the caller would have closed the partition
625 : * relation after the last use of this entry. Note that localrelvalid is
626 : * only updated by the relcache invalidation callback, so it may still be
627 : * true irrespective of whether the Relation pointed to by localrel has
628 : * been cleared or not.
629 : */
298 akapila 630 CBC 29 : if (found && entry->localrelvalid)
631 : {
292 akapila 632 GIC 15 : entry->localrel = partrel;
298 633 15 : return entry;
292 akapila 634 ECB : }
635 :
636 : /* Switch to longer-lived context. */
1098 peter 637 GIC 14 : oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
638 :
298 akapila 639 14 : if (!found)
640 : {
641 8 : memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
642 8 : part_entry->partoid = partOid;
643 : }
644 :
290 akapila 645 ECB : /* Release the no-longer-useful attrmap, if any. */
290 akapila 646 GIC 14 : if (entry->attrmap)
290 akapila 647 ECB : {
290 akapila 648 CBC 1 : free_attrmap(entry->attrmap);
290 akapila 649 GIC 1 : entry->attrmap = NULL;
650 : }
651 :
298 akapila 652 CBC 14 : if (!entry->remoterel.remoteid)
653 : {
298 akapila 654 ECB : int i;
655 :
656 : /* Remote relation is copied as-is from the root entry. */
298 akapila 657 CBC 13 : entry = &part_entry->relmapentry;
298 akapila 658 GIC 13 : entry->remoterel.remoteid = remoterel->remoteid;
659 13 : entry->remoterel.nspname = pstrdup(remoterel->nspname);
660 13 : entry->remoterel.relname = pstrdup(remoterel->relname);
298 akapila 661 CBC 13 : entry->remoterel.natts = remoterel->natts;
298 akapila 662 GIC 13 : entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
298 akapila 663 CBC 13 : entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
664 40 : for (i = 0; i < remoterel->natts; i++)
665 : {
298 akapila 666 GIC 27 : entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
298 akapila 667 CBC 27 : entry->remoterel.atttyps[i] = remoterel->atttyps[i];
668 : }
298 akapila 669 GIC 13 : entry->remoterel.replident = remoterel->replident;
670 13 : entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
671 : }
1098 peter 672 ECB :
1098 peter 673 CBC 14 : entry->localrel = partrel;
674 14 : entry->localreloid = partOid;
1098 peter 675 ECB :
676 : /*
677 : * If the partition's attributes don't match the root relation's, we'll
678 : * need to make a new attrmap which maps partition attribute numbers to
941 michael 679 : * remoterel's, instead of the original which maps root relation's
680 : * attribute numbers to remoterel's.
1098 peter 681 : *
682 : * Note that 'map' which comes from the tuple routing data structure
683 : * contains 1-based attribute numbers (of the parent relation). However,
684 : * the map in 'entry', a logical replication data structure, contains
685 : * 0-based attribute numbers (of the remote relation).
686 : */
1098 peter 687 GIC 14 : if (map)
1098 peter 688 ECB : {
689 : AttrNumber attno;
690 :
1098 peter 691 GIC 7 : entry->attrmap = make_attrmap(map->maplen);
692 30 : for (attno = 0; attno < entry->attrmap->maplen; attno++)
693 : {
694 23 : AttrNumber root_attno = map->attnums[attno];
695 :
696 : /* 0 means it's a dropped attribute. See comments atop AttrMap. */
298 akapila 697 23 : if (root_attno == 0)
698 2 : entry->attrmap->attnums[attno] = -1;
699 : else
700 21 : entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
701 : }
1098 peter 702 ECB : }
703 : else
704 : {
705 : /* Lacking copy_attmap, do this the hard way. */
667 tgl 706 CBC 7 : entry->attrmap = make_attrmap(attrmap->maplen);
707 7 : memcpy(entry->attrmap->attnums, attrmap->attnums,
667 tgl 708 GIC 7 : attrmap->maplen * sizeof(AttrNumber));
667 tgl 709 ECB : }
710 :
711 : /* Set if the table's replica identity is enough to apply update/delete. */
292 akapila 712 CBC 14 : logicalrep_rel_mark_updatable(entry);
1098 peter 713 ECB :
714 : /* state and statelsn are left set to 0. */
1098 peter 715 GIC 14 : MemoryContextSwitchTo(oldctx);
716 :
717 : /*
718 : * Finding a usable index is an infrequent task. It occurs when an
719 : * operation is first performed on the relation, or after invalidation of
720 : * the relation cache entry (such as ANALYZE or CREATE/DROP index on the
721 : * relation).
722 : *
723 : * We also prefer to run this code on the oldctx so that we do not leak
724 : * anything in the LogicalRepPartMapContext (hence CacheMemoryContext).
725 : */
25 akapila 726 GNC 14 : entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel,
727 : entry->attrmap);
728 :
729 14 : entry->localrelvalid = true;
730 :
1098 peter 731 GIC 14 : return entry;
732 : }
733 :
734 : /*
735 : * Returns true if the given index consists only of expressions such as:
736 : * CREATE INDEX idx ON table(foo(col));
737 : *
738 : * Returns false even if there is one column reference:
739 : * CREATE INDEX idx ON table(foo(col), col_2);
740 : */
741 : static bool
25 akapila 742 GNC 27 : IsIndexOnlyOnExpression(IndexInfo *indexInfo)
743 : {
744 29 : for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
745 : {
746 27 : AttrNumber attnum = indexInfo->ii_IndexAttrNumbers[i];
747 :
748 27 : if (AttributeNumberIsValid(attnum))
749 25 : return false;
750 : }
751 :
752 2 : return true;
753 : }
754 :
755 : /*
756 : * Returns true if the attrmap contains the leftmost column of the index.
757 : * Otherwise returns false.
758 : *
759 : * attrmap is a map of local attributes to remote ones. We can consult this
760 : * map to check whether the local index attribute has a corresponding remote
761 : * attribute.
762 : */
763 : static bool
764 16 : RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap)
765 : {
766 : AttrNumber keycol;
767 :
768 16 : Assert(indexInfo->ii_NumIndexAttrs >= 1);
769 :
770 16 : keycol = indexInfo->ii_IndexAttrNumbers[0];
771 16 : if (!AttributeNumberIsValid(keycol))
772 2 : return false;
773 :
774 14 : if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol))
25 akapila 775 UNC 0 : return false;
776 :
25 akapila 777 GNC 14 : return attrmap->attnums[AttrNumberGetAttrOffset(keycol)] >= 0;
778 : }
779 :
780 : /*
781 : * Returns the oid of an index that can be used by the apply worker to scan
782 : * the relation. The index must be btree, non-partial, and have at least
783 : * one column reference (i.e. cannot consist of only expressions). These
784 : * limitations help to keep the index scan similar to PK/RI index scans.
785 : *
786 : * Note that the limitations of index scans for replica identity full only
787 : * adheres to a subset of the limitations of PK/RI. For example, we support
788 : * columns that are marked as [NULL] or we are not interested in the [NOT
789 : * DEFERRABLE] aspect of constraints here. It works for us because we always
790 : * compare the tuples for non-PK/RI index scans. See
791 : * RelationFindReplTupleByIndex().
792 : *
793 : * XXX: There are no fundamental problems for supporting non-btree indexes.
794 : * We mostly need to relax the limitations in RelationFindReplTupleByIndex().
795 : * For partial indexes, the required changes are likely to be larger. If
796 : * none of the tuples satisfy the expression for the index scan, we fall-back
797 : * to sequential execution, which might not be a good idea in some cases.
798 : *
799 : * We also skip indexes if the remote relation does not contain the leftmost
800 : * column of the index. This is because in most such cases sequential scan is
801 : * favorable over index scan.
802 : *
803 : * We expect to call this function when REPLICA IDENTITY FULL is defined for
804 : * the remote relation.
805 : *
806 : * If no suitable index is found, returns InvalidOid.
807 : */
808 : static Oid
809 47 : FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
810 : {
811 47 : List *idxlist = RelationGetIndexList(localrel);
812 : ListCell *lc;
813 :
814 51 : foreach(lc, idxlist)
815 : {
816 16 : Oid idxoid = lfirst_oid(lc);
817 : bool isUsableIdx;
818 : bool containsLeftMostCol;
819 : Relation idxRel;
820 : IndexInfo *idxInfo;
821 :
822 16 : idxRel = index_open(idxoid, AccessShareLock);
823 16 : idxInfo = BuildIndexInfo(idxRel);
824 16 : isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxInfo);
825 : containsLeftMostCol =
826 16 : RemoteRelContainsLeftMostColumnOnIdx(idxInfo, attrmap);
827 16 : index_close(idxRel, AccessShareLock);
828 :
829 : /* Return the first eligible index found */
830 16 : if (isUsableIdx && containsLeftMostCol)
831 12 : return idxoid;
832 : }
833 :
834 35 : return InvalidOid;
835 : }
836 :
837 : /*
838 : * Returns true if the index is usable for replica identity full. For details,
839 : * see FindUsableIndexForReplicaIdentityFull.
840 : */
841 : bool
842 27 : IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
843 : {
844 27 : bool is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
845 27 : bool is_partial = (indexInfo->ii_Predicate != NIL);
846 27 : bool is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
847 :
848 27 : return is_btree && !is_partial && !is_only_on_expression;
849 : }
850 :
851 : /*
852 : * Get replica identity index or if it is not defined a primary key.
853 : *
854 : * If neither is defined, returns InvalidOid
855 : */
856 : Oid
857 72485 : GetRelationIdentityOrPK(Relation rel)
858 : {
859 : Oid idxoid;
860 :
861 72485 : idxoid = RelationGetReplicaIndex(rel);
862 :
863 72485 : if (!OidIsValid(idxoid))
864 144 : idxoid = RelationGetPrimaryKeyIndex(rel);
865 :
866 72485 : return idxoid;
867 : }
868 :
869 : /*
870 : * Returns the index oid if we can use an index for subscriber. Otherwise,
871 : * returns InvalidOid.
872 : */
873 : static Oid
874 461 : FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
875 : AttrMap *attrMap)
876 : {
877 : Oid idxoid;
878 :
879 : /*
880 : * We never need index oid for partitioned tables, always rely on leaf
881 : * partition's index.
882 : */
883 461 : if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
884 47 : return InvalidOid;
885 :
886 : /*
887 : * Simple case, we already have a primary key or a replica identity index.
888 : */
889 414 : idxoid = GetRelationIdentityOrPK(localrel);
890 414 : if (OidIsValid(idxoid))
891 287 : return idxoid;
892 :
893 127 : if (remoterel->replident == REPLICA_IDENTITY_FULL)
894 : {
895 : /*
896 : * We are looking for one more opportunity for using an index. If
897 : * there are any indexes defined on the local relation, try to pick a
898 : * suitable index.
899 : *
900 : * The index selection safely assumes that all the columns are going
901 : * to be available for the index scan given that remote relation has
902 : * replica identity full.
903 : *
904 : * Note that we are not using the planner to find the cheapest method
905 : * to scan the relation as that would require us to either use lower
906 : * level planner functions which would be a maintenance burden in the
907 : * long run or use the full-fledged planner which could cause
908 : * overhead.
909 : */
910 47 : return FindUsableIndexForReplicaIdentityFull(localrel, attrMap);
911 : }
912 :
913 80 : return InvalidOid;
914 : }
|