Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "optimizer/optimizer.h"
26 : #include "parser/parse_relation.h"
27 : #include "replication/logical.h"
28 : #include "replication/logicalproto.h"
29 : #include "replication/origin.h"
30 : #include "replication/pgoutput.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
2271 peter_e 39 GIC 360 : PG_MODULE_MAGIC;
40 :
41 : static void pgoutput_startup(LogicalDecodingContext *ctx,
42 : OutputPluginOptions *opt, bool is_init);
43 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
44 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
45 : ReorderBufferTXN *txn);
46 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
47 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
48 : static void pgoutput_change(LogicalDecodingContext *ctx,
49 : ReorderBufferTXN *txn, Relation relation,
50 : ReorderBufferChange *change);
51 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
53 : ReorderBufferChange *change);
54 : static void pgoutput_message(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
56 : bool transactional, const char *prefix,
57 : Size sz, const char *message);
58 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
59 : RepOriginId origin_id);
60 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
61 : ReorderBufferTXN *txn);
62 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
63 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
64 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
65 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
66 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
67 : ReorderBufferTXN *txn,
68 : XLogRecPtr prepare_end_lsn,
69 : TimestampTz prepare_time);
70 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
71 : ReorderBufferTXN *txn);
72 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
73 : ReorderBufferTXN *txn);
74 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
75 : ReorderBufferTXN *txn,
76 : XLogRecPtr abort_lsn);
77 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr commit_lsn);
80 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82 :
83 : static bool publications_valid;
84 : static bool in_streaming;
85 : static bool publish_no_origin;
86 :
87 : static List *LoadPublications(List *pubnames);
88 : static void publication_invalidation_cb(Datum arg, int cacheid,
89 : uint32 hashvalue);
90 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
91 : LogicalDecodingContext *ctx,
92 : Bitmapset *columns);
93 : static void send_repl_origin(LogicalDecodingContext *ctx,
94 : RepOriginId origin_id, XLogRecPtr origin_lsn,
95 : bool send_origin);
96 :
97 : /*
98 : * Only 3 publication actions are used for row filtering ("insert", "update",
99 : * "delete"). See RelationSyncEntry.exprstate[].
100 : */
101 : enum RowFilterPubAction
102 : {
103 : PUBACTION_INSERT,
104 : PUBACTION_UPDATE,
105 : PUBACTION_DELETE
106 : };
107 :
108 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
109 :
110 : /*
111 : * Entry in the map used to remember which relation schemas we sent.
112 : *
113 : * The schema_sent flag determines if the current schema record for the
114 : * relation (and for its ancestor if publish_as_relid is set) was already
115 : * sent to the subscriber (in which case we don't need to send it again).
116 : *
117 : * The schema cache on downstream is however updated only at commit time,
118 : * and with streamed transactions the commit order may be different from
119 : * the order the transactions are sent in. Also, the (sub) transactions
120 : * might get aborted so we need to send the schema for each (sub) transaction
121 : * so that we don't lose the schema information on abort. For handling this,
122 : * we maintain the list of xids (streamed_txns) for those we have already sent
123 : * the schema.
124 : *
125 : * For partitions, 'pubactions' considers not only the table's own
126 : * publications, but also those of all of its ancestors.
127 : */
128 : typedef struct RelationSyncEntry
129 : {
130 : Oid relid; /* relation oid */
131 :
132 : bool replicate_valid; /* overall validity flag for entry */
133 :
134 : bool schema_sent;
135 : List *streamed_txns; /* streamed toplevel transactions with this
136 : * schema */
137 :
138 : /* are we publishing this rel? */
139 : PublicationActions pubactions;
140 :
141 : /*
142 : * ExprState array for row filter. Different publication actions don't
143 : * allow multiple expressions to always be combined into one, because
144 : * updates or deletes restrict the column in expression to be part of the
145 : * replica identity index whereas inserts do not have this restriction, so
146 : * there is one ExprState per publication action.
147 : */
148 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
149 : EState *estate; /* executor state used for row filter */
150 : TupleTableSlot *new_slot; /* slot for storing new tuple */
151 : TupleTableSlot *old_slot; /* slot for storing old tuple */
152 :
153 : /*
154 : * OID of the relation to publish changes as. For a partition, this may
155 : * be set to one of its ancestors whose schema will be used when
156 : * replicating changes, if publish_via_partition_root is set for the
157 : * publication.
158 : */
159 : Oid publish_as_relid;
160 :
161 : /*
162 : * Map used when replicating using an ancestor's schema to convert tuples
163 : * from partition's type to the ancestor's; NULL if publish_as_relid is
164 : * same as 'relid' or if unnecessary due to partition and the ancestor
165 : * having identical TupleDesc.
166 : */
167 : AttrMap *attrmap;
168 :
169 : /*
170 : * Columns included in the publication, or NULL if all columns are
171 : * included implicitly. Note that the attnums in this bitmap are not
172 : * shifted by FirstLowInvalidHeapAttributeNumber.
173 : */
174 : Bitmapset *columns;
175 :
176 : /*
177 : * Private context to store additional data for this entry - state for the
178 : * row filter expressions, column list, etc.
179 : */
180 : MemoryContext entry_cxt;
181 : } RelationSyncEntry;
182 :
183 : /*
184 : * Maintain a per-transaction level variable to track whether the transaction
185 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
186 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
187 : * messages for empty transactions which saves network bandwidth.
188 : *
189 : * This optimization is not used for prepared transactions because if the
190 : * WALSender restarts after prepare of a transaction and before commit prepared
191 : * of the same transaction then we won't be able to figure out if we have
192 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
193 : * because we would have lost the in-memory txndata information that was
194 : * present prior to the restart. This will result in sending a spurious
195 : * COMMIT PREPARED without a corresponding prepared transaction at the
196 : * downstream which would lead to an error when it tries to process it.
197 : *
198 : * XXX We could achieve this optimization by changing protocol to send
199 : * additional information so that downstream can detect that the corresponding
200 : * prepare has not been sent. However, adding such a check for every
201 : * transaction in the downstream could be costly so we might want to do it
202 : * optionally.
203 : *
204 : * We also don't have this optimization for streamed transactions because
205 : * they can contain prepared transactions.
206 : */
207 : typedef struct PGOutputTxnData
208 : {
209 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
210 : } PGOutputTxnData;
211 :
212 : /* Map used to remember which relation schemas we sent. */
213 : static HTAB *RelationSyncCache = NULL;
214 :
215 : static void init_rel_sync_cache(MemoryContext cachectx);
216 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
217 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
218 : Relation relation);
219 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
220 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
221 : uint32 hashvalue);
222 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
223 : TransactionId xid);
224 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
225 : TransactionId xid);
226 : static void init_tuple_slot(PGOutputData *data, Relation relation,
227 : RelationSyncEntry *entry);
228 :
229 : /* row filter routines */
230 : static EState *create_estate_for_relation(Relation rel);
231 : static void pgoutput_row_filter_init(PGOutputData *data,
232 : List *publications,
233 : RelationSyncEntry *entry);
234 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
235 : ExprContext *econtext);
236 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
237 : TupleTableSlot **new_slot_ptr,
238 : RelationSyncEntry *entry,
239 : ReorderBufferChangeType *action);
240 :
241 : /* column list routines */
242 : static void pgoutput_column_list_init(PGOutputData *data,
243 : List *publications,
244 : RelationSyncEntry *entry);
245 :
246 : /*
247 : * Specify output plugin callbacks
248 : */
249 : void
2271 peter_e 250 CBC 511 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
251 : {
252 511 : cb->startup_cb = pgoutput_startup;
253 511 : cb->begin_cb = pgoutput_begin_txn;
254 511 : cb->change_cb = pgoutput_change;
1828 255 511 : cb->truncate_cb = pgoutput_truncate;
733 akapila 256 GIC 511 : cb->message_cb = pgoutput_message;
2271 peter_e 257 CBC 511 : cb->commit_cb = pgoutput_commit_txn;
634 akapila 258 ECB :
634 akapila 259 CBC 511 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
260 511 : cb->prepare_cb = pgoutput_prepare_txn;
261 511 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
262 511 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
2271 peter_e 263 GIC 511 : cb->filter_by_origin_cb = pgoutput_origin_filter;
264 511 : cb->shutdown_cb = pgoutput_shutdown;
948 akapila 265 ECB :
266 : /* transaction streaming */
948 akapila 267 CBC 511 : cb->stream_start_cb = pgoutput_stream_start;
268 511 : cb->stream_stop_cb = pgoutput_stream_stop;
269 511 : cb->stream_abort_cb = pgoutput_stream_abort;
270 511 : cb->stream_commit_cb = pgoutput_stream_commit;
271 511 : cb->stream_change_cb = pgoutput_change;
733 akapila 272 GIC 511 : cb->stream_message_cb = pgoutput_message;
948 akapila 273 CBC 511 : cb->stream_truncate_cb = pgoutput_truncate;
634 akapila 274 ECB : /* transaction streaming - two-phase commit */
613 akapila 275 GIC 511 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
2271 peter_e 276 511 : }
2271 peter_e 277 ECB :
278 : static void
733 akapila 279 GIC 281 : parse_output_parameters(List *options, PGOutputData *data)
2271 peter_e 280 ECB : {
281 : ListCell *lc;
2271 peter_e 282 CBC 281 : bool protocol_version_given = false;
283 281 : bool publication_names_given = false;
995 tgl 284 281 : bool binary_option_given = false;
733 akapila 285 281 : bool messages_option_given = false;
948 286 281 : bool streaming_given = false;
634 akapila 287 GIC 281 : bool two_phase_option_given = false;
262 akapila 288 GNC 281 : bool origin_option_given = false;
995 tgl 289 ECB :
733 akapila 290 CBC 281 : data->binary = false;
90 akapila 291 GNC 281 : data->streaming = LOGICALREP_STREAM_OFF;
733 akapila 292 CBC 281 : data->messages = false;
634 akapila 293 GIC 281 : data->two_phase = false;
2271 peter_e 294 ECB :
2271 peter_e 295 GIC 1170 : foreach(lc, options)
2271 peter_e 296 ECB : {
2271 peter_e 297 GIC 889 : DefElem *defel = (DefElem *) lfirst(lc);
2271 peter_e 298 ECB :
2271 peter_e 299 GIC 889 : Assert(defel->arg == NULL || IsA(defel->arg, String));
300 :
2195 peter_e 301 ECB : /* Check each param, whether or not we recognize it */
2271 peter_e 302 GIC 889 : if (strcmp(defel->defname, "proto_version") == 0)
303 : {
304 : unsigned long parsed;
305 : char *endptr;
2271 peter_e 306 ECB :
2271 peter_e 307 GBC 281 : if (protocol_version_given)
2271 peter_e 308 UIC 0 : ereport(ERROR,
309 : (errcode(ERRCODE_SYNTAX_ERROR),
2271 peter_e 310 ECB : errmsg("conflicting or redundant options")));
2271 peter_e 311 GIC 281 : protocol_version_given = true;
2271 peter_e 312 ECB :
419 peter 313 CBC 281 : errno = 0;
314 281 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
419 peter 315 GBC 281 : if (errno != 0 || *endptr != '\0')
2271 peter_e 316 UIC 0 : ereport(ERROR,
317 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
318 : errmsg("invalid proto_version")));
2271 peter_e 319 ECB :
419 peter 320 GBC 281 : if (parsed > PG_UINT32_MAX)
2271 peter_e 321 UIC 0 : ereport(ERROR,
322 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
323 : errmsg("proto_version \"%s\" out of range",
324 : strVal(defel->arg))));
2271 peter_e 325 ECB :
733 akapila 326 GIC 281 : data->protocol_version = (uint32) parsed;
2271 peter_e 327 ECB : }
2271 peter_e 328 GIC 608 : else if (strcmp(defel->defname, "publication_names") == 0)
2271 peter_e 329 ECB : {
2271 peter_e 330 GBC 281 : if (publication_names_given)
2271 peter_e 331 UIC 0 : ereport(ERROR,
332 : (errcode(ERRCODE_SYNTAX_ERROR),
2271 peter_e 333 ECB : errmsg("conflicting or redundant options")));
2271 peter_e 334 GIC 281 : publication_names_given = true;
2271 peter_e 335 ECB :
2271 peter_e 336 GIC 281 : if (!SplitIdentifierString(strVal(defel->arg), ',',
733 akapila 337 EUB : &data->publication_names))
2153 bruce 338 UIC 0 : ereport(ERROR,
339 : (errcode(ERRCODE_INVALID_NAME),
340 : errmsg("invalid publication_names syntax")));
2271 peter_e 341 ECB : }
995 tgl 342 GIC 327 : else if (strcmp(defel->defname, "binary") == 0)
995 tgl 343 ECB : {
995 tgl 344 GBC 11 : if (binary_option_given)
995 tgl 345 UIC 0 : ereport(ERROR,
346 : (errcode(ERRCODE_SYNTAX_ERROR),
995 tgl 347 ECB : errmsg("conflicting or redundant options")));
995 tgl 348 GIC 11 : binary_option_given = true;
995 tgl 349 ECB :
733 akapila 350 GIC 11 : data->binary = defGetBoolean(defel);
995 tgl 351 ECB : }
733 akapila 352 GIC 316 : else if (strcmp(defel->defname, "messages") == 0)
733 akapila 353 ECB : {
733 akapila 354 GBC 4 : if (messages_option_given)
733 akapila 355 UIC 0 : ereport(ERROR,
356 : (errcode(ERRCODE_SYNTAX_ERROR),
733 akapila 357 ECB : errmsg("conflicting or redundant options")));
733 akapila 358 GIC 4 : messages_option_given = true;
733 akapila 359 ECB :
733 akapila 360 GIC 4 : data->messages = defGetBoolean(defel);
733 akapila 361 ECB : }
948 akapila 362 GIC 312 : else if (strcmp(defel->defname, "streaming") == 0)
948 akapila 363 ECB : {
948 akapila 364 GBC 33 : if (streaming_given)
948 akapila 365 UIC 0 : ereport(ERROR,
366 : (errcode(ERRCODE_SYNTAX_ERROR),
948 akapila 367 ECB : errmsg("conflicting or redundant options")));
948 akapila 368 GIC 33 : streaming_given = true;
948 akapila 369 ECB :
90 akapila 370 GNC 33 : data->streaming = defGetStreamingMode(defel);
948 akapila 371 ECB : }
634 akapila 372 GIC 279 : else if (strcmp(defel->defname, "two_phase") == 0)
634 akapila 373 ECB : {
634 akapila 374 GBC 4 : if (two_phase_option_given)
634 akapila 375 UIC 0 : ereport(ERROR,
376 : (errcode(ERRCODE_SYNTAX_ERROR),
634 akapila 377 ECB : errmsg("conflicting or redundant options")));
634 akapila 378 GIC 4 : two_phase_option_given = true;
634 akapila 379 ECB :
634 akapila 380 GIC 4 : data->two_phase = defGetBoolean(defel);
634 akapila 381 ECB : }
262 akapila 382 GNC 275 : else if (strcmp(defel->defname, "origin") == 0)
383 : {
384 275 : if (origin_option_given)
262 akapila 385 UNC 0 : ereport(ERROR,
386 : errcode(ERRCODE_SYNTAX_ERROR),
387 : errmsg("conflicting or redundant options"));
262 akapila 388 GNC 275 : origin_option_given = true;
389 :
390 275 : data->origin = defGetString(defel);
391 275 : if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
392 9 : publish_no_origin = true;
393 266 : else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
394 266 : publish_no_origin = false;
395 : else
262 akapila 396 UNC 0 : ereport(ERROR,
397 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398 : errmsg("unrecognized origin value: \"%s\"", data->origin));
399 : }
400 : else
2271 peter_e 401 LBC 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
2271 peter_e 402 EUB : }
2271 peter_e 403 GIC 281 : }
404 :
2271 peter_e 405 ECB : /*
406 : * Initialize this plugin
407 : */
408 : static void
2153 bruce 409 CBC 511 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
2153 bruce 410 ECB : bool is_init)
2271 peter_e 411 : {
2153 bruce 412 GIC 511 : PGOutputData *data = palloc0(sizeof(PGOutputData));
45 tgl 413 EUB : static bool publication_callback_registered = false;
414 :
415 : /* Create our memory context for private allocations. */
2271 peter_e 416 GIC 511 : data->context = AllocSetContextCreate(ctx->context,
417 : "logical replication output context",
1943 tgl 418 EUB : ALLOCSET_DEFAULT_SIZES);
419 :
411 akapila 420 CBC 511 : data->cachectx = AllocSetContextCreate(ctx->context,
421 : "logical replication cache context",
422 : ALLOCSET_DEFAULT_SIZES);
423 :
2271 peter_e 424 GIC 511 : ctx->output_plugin_private = data;
425 :
2271 peter_e 426 ECB : /* This plugin uses binary protocol. */
2271 peter_e 427 GIC 511 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
428 :
2271 peter_e 429 ECB : /*
430 : * This is replication start and not slot initialization.
431 : *
432 : * Parse and validate options passed by the client.
433 : */
2271 peter_e 434 GIC 511 : if (!is_init)
435 : {
436 : /* Parse the params and ERROR if we see any we don't recognize */
733 akapila 437 CBC 281 : parse_output_parameters(ctx->output_plugin_options, data);
438 :
439 : /* Check if we support requested protocol */
925 akapila 440 GIC 281 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
2271 peter_e 441 LBC 0 : ereport(ERROR,
442 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
443 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
925 akapila 444 ECB : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
445 :
2271 peter_e 446 GIC 281 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
2271 peter_e 447 UIC 0 : ereport(ERROR,
448 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
449 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
450 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
2271 peter_e 451 ECB :
235 tgl 452 GNC 281 : if (data->publication_names == NIL)
2271 peter_e 453 UIC 0 : ereport(ERROR,
2271 peter_e 454 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 : errmsg("publication_names parameter missing")));
456 :
948 akapila 457 : /*
948 akapila 458 EUB : * Decide whether to enable streaming. It is disabled by default, in
459 : * which case we just update the flag in decoding context. Otherwise
460 : * we only allow it with sufficient version of the protocol, and when
461 : * the output plugin supports it.
462 : */
90 akapila 463 GNC 281 : if (data->streaming == LOGICALREP_STREAM_OFF)
948 akapila 464 GBC 248 : ctx->streaming = false;
90 akapila 465 GNC 33 : else if (data->streaming == LOGICALREP_STREAM_ON &&
466 26 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
948 akapila 467 UIC 0 : ereport(ERROR,
468 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
948 akapila 470 ECB : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
90 akapila 471 GNC 33 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
472 7 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
90 akapila 473 UNC 0 : ereport(ERROR,
474 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
475 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
476 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
948 akapila 477 GBC 33 : else if (!ctx->streaming)
948 akapila 478 UIC 0 : ereport(ERROR,
479 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
480 : errmsg("streaming requested, but not supported by output plugin")));
481 :
482 : /* Also remember we're currently not streaming any transaction. */
948 akapila 483 GIC 281 : in_streaming = false;
484 :
485 : /*
486 : * Here, we just check whether the two-phase option is passed by
634 akapila 487 ECB : * plugin and decide whether to enable it at later point of time. It
488 : * remains enabled if the previous start-up has done so. But we only
489 : * allow the option to be passed in with sufficient version of the
490 : * protocol, and when the output plugin supports it.
634 akapila 491 EUB : */
634 akapila 492 GIC 281 : if (!data->two_phase)
493 277 : ctx->twophase_opt_given = false;
494 4 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
634 akapila 495 LBC 0 : ereport(ERROR,
634 akapila 496 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
634 akapila 497 EUB : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
498 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
634 akapila 499 GIC 4 : else if (!ctx->twophase)
634 akapila 500 UIC 0 : ereport(ERROR,
634 akapila 501 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
634 akapila 502 EUB : errmsg("two-phase commit requested, but not supported by output plugin")));
503 : else
634 akapila 504 GIC 4 : ctx->twophase_opt_given = true;
505 :
506 : /* Init publication state. */
2271 peter_e 507 CBC 281 : data->publications = NIL;
2271 peter_e 508 GIC 281 : publications_valid = false;
509 :
510 : /*
511 : * Register callback for pg_publication if we didn't already do that
512 : * during some previous call in this process.
513 : */
45 tgl 514 281 : if (!publication_callback_registered)
515 : {
45 tgl 516 CBC 281 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
45 tgl 517 ECB : publication_invalidation_cb,
518 : (Datum) 0);
45 tgl 519 GBC 281 : publication_callback_registered = true;
520 : }
521 :
522 : /* Initialize relation schema cache. */
2271 peter_e 523 CBC 281 : init_rel_sync_cache(CacheMemoryContext);
2271 peter_e 524 EUB : }
525 : else
526 : {
527 : /*
634 akapila 528 ECB : * Disable the streaming and prepared transactions during the slot
529 : * initialization mode.
530 : */
948 akapila 531 CBC 230 : ctx->streaming = false;
634 532 230 : ctx->twophase = false;
533 : }
2271 peter_e 534 GIC 511 : }
535 :
536 : /*
537 : * BEGIN callback.
375 akapila 538 ECB : *
539 : * Don't send the BEGIN message here instead postpone it until the first
540 : * change. In logical replication, a common scenario is to replicate a set of
541 : * tables (instead of all tables) and transactions whose changes were on
542 : * the table(s) that are not published will produce empty transactions. These
543 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
544 : * using bandwidth on something with little/no use for logical replication.
545 : */
546 : static void
332 tgl 547 CBC 594 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
548 : {
332 tgl 549 GIC 594 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
550 : sizeof(PGOutputTxnData));
551 :
375 akapila 552 594 : txn->output_plugin_private = txndata;
553 594 : }
554 :
375 akapila 555 ECB : /*
556 : * Send BEGIN.
557 : *
558 : * This is called while processing the first change of the transaction.
559 : */
560 : static void
375 akapila 561 GIC 335 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
562 : {
2153 bruce 563 335 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
375 akapila 564 335 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
565 :
566 335 : Assert(txndata);
567 335 : Assert(!txndata->sent_begin_txn);
568 :
2271 peter_e 569 335 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
570 335 : logicalrep_write_begin(ctx->out, txn);
375 akapila 571 CBC 335 : txndata->sent_begin_txn = true;
572 :
634 573 335 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
574 : send_replication_origin);
575 :
2271 peter_e 576 335 : OutputPluginWrite(ctx, true);
577 335 : }
578 :
579 : /*
580 : * COMMIT callback
581 : */
582 : static void
2271 peter_e 583 GIC 590 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
584 : XLogRecPtr commit_lsn)
2271 peter_e 585 ECB : {
375 akapila 586 GIC 590 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
375 akapila 587 ECB : bool sent_begin_txn;
588 :
375 akapila 589 GIC 590 : Assert(txndata);
375 akapila 590 ECB :
591 : /*
592 : * We don't need to send the commit message unless some relevant change
593 : * from this transaction has been sent to the downstream.
594 : */
375 akapila 595 CBC 590 : sent_begin_txn = txndata->sent_begin_txn;
60 akapila 596 GNC 590 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
375 akapila 597 CBC 590 : pfree(txndata);
375 akapila 598 GIC 590 : txn->output_plugin_private = NULL;
599 :
375 akapila 600 CBC 590 : if (!sent_begin_txn)
375 akapila 601 ECB : {
375 akapila 602 GIC 256 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
603 256 : return;
604 : }
605 :
2271 peter_e 606 334 : OutputPluginPrepareWrite(ctx, true);
2271 peter_e 607 CBC 334 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
2271 peter_e 608 GIC 334 : OutputPluginWrite(ctx, true);
609 : }
2271 peter_e 610 ECB :
611 : /*
612 : * BEGIN PREPARE callback
634 akapila 613 : */
614 : static void
634 akapila 615 GIC 18 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
616 : {
617 18 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
618 :
634 akapila 619 CBC 18 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
620 18 : logicalrep_write_begin_prepare(ctx->out, txn);
634 akapila 621 ECB :
634 akapila 622 CBC 18 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
623 : send_replication_origin);
634 akapila 624 ECB :
634 akapila 625 GIC 18 : OutputPluginWrite(ctx, true);
634 akapila 626 CBC 18 : }
634 akapila 627 ECB :
628 : /*
629 : * PREPARE callback
630 : */
631 : static void
634 akapila 632 CBC 18 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
633 : XLogRecPtr prepare_lsn)
634 : {
60 akapila 635 GNC 18 : OutputPluginUpdateProgress(ctx, false);
636 :
634 akapila 637 GIC 18 : OutputPluginPrepareWrite(ctx, true);
638 18 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
634 akapila 639 CBC 18 : OutputPluginWrite(ctx, true);
634 akapila 640 GIC 18 : }
634 akapila 641 ECB :
642 : /*
643 : * COMMIT PREPARED callback
644 : */
645 : static void
634 akapila 646 CBC 21 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
647 : XLogRecPtr commit_lsn)
648 : {
60 akapila 649 GNC 21 : OutputPluginUpdateProgress(ctx, false);
634 akapila 650 ECB :
634 akapila 651 GIC 21 : OutputPluginPrepareWrite(ctx, true);
652 21 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
653 21 : OutputPluginWrite(ctx, true);
654 21 : }
655 :
634 akapila 656 ECB : /*
657 : * ROLLBACK PREPARED callback
658 : */
659 : static void
634 akapila 660 GIC 7 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
634 akapila 661 ECB : ReorderBufferTXN *txn,
662 : XLogRecPtr prepare_end_lsn,
663 : TimestampTz prepare_time)
664 : {
60 akapila 665 GNC 7 : OutputPluginUpdateProgress(ctx, false);
666 :
634 akapila 667 GIC 7 : OutputPluginPrepareWrite(ctx, true);
668 7 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
669 : prepare_time);
634 akapila 670 CBC 7 : OutputPluginWrite(ctx, true);
634 akapila 671 GIC 7 : }
672 :
1828 peter_e 673 ECB : /*
674 : * Write the current schema of the relation and its ancestor (if any) if not
1096 peter 675 : * done yet.
1828 peter_e 676 : */
677 : static void
1828 peter_e 678 CBC 181950 : maybe_send_schema(LogicalDecodingContext *ctx,
679 : ReorderBufferChange *change,
680 : Relation relation, RelationSyncEntry *relentry)
681 : {
682 : bool schema_sent;
948 akapila 683 GIC 181950 : TransactionId xid = InvalidTransactionId;
948 akapila 684 CBC 181950 : TransactionId topxid = InvalidTransactionId;
685 :
686 : /*
687 : * Remember XID of the (sub)transaction for the change. We don't care if
688 : * it's top-level transaction or not (we have already sent that XID in
948 akapila 689 ECB : * start of the current streaming block).
690 : *
691 : * If we're not in a streaming block, just use InvalidTransactionId and
692 : * the write methods will not include it.
693 : */
948 akapila 694 CBC 181950 : if (in_streaming)
695 175901 : xid = change->txn->xid;
696 :
23 akapila 697 GNC 181950 : if (rbtxn_is_subtxn(change->txn))
698 10169 : topxid = rbtxn_get_toptxn(change->txn)->xid;
699 : else
948 akapila 700 GIC 171781 : topxid = xid;
701 :
948 akapila 702 ECB : /*
703 : * Do we need to send the schema? We do track streamed transactions
704 : * separately, because those may be applied later (and the regular
705 : * transactions won't see their effects until then) and in an order that
706 : * we don't know at this point.
707 : *
708 : * XXX There is a scope of optimization here. Currently, we always send
709 : * the schema first time in a streaming transaction but we can probably
710 : * avoid that by checking 'relentry->schema_sent' flag. However, before
711 : * doing that we need to study its impact on the case where we have a mix
712 : * of streaming and non-streaming transactions.
713 : */
948 akapila 714 GIC 181950 : if (in_streaming)
715 175901 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
716 : else
717 6049 : schema_sent = relentry->schema_sent;
948 akapila 718 ECB :
677 719 : /* Nothing to do if we already sent the schema. */
948 akapila 720 GIC 181950 : if (schema_sent)
1096 peter 721 CBC 181693 : return;
1096 peter 722 ECB :
723 : /*
411 akapila 724 : * Send the schema. If the changes will be published using an ancestor's
725 : * schema, not the relation's own, send that ancestor's schema before
726 : * sending relation's own (XXX - maybe sending only the former suffices?).
727 : */
1096 peter 728 GIC 257 : if (relentry->publish_as_relid != RelationGetRelid(relation))
729 : {
730 25 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
731 :
379 tomas.vondra 732 25 : send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
1096 peter 733 25 : RelationClose(ancestor);
734 : }
735 :
379 tomas.vondra 736 257 : send_relation_and_attrs(relation, xid, ctx, relentry->columns);
737 :
948 akapila 738 CBC 257 : if (in_streaming)
739 64 : set_schema_sent_in_streamed_txn(relentry, topxid);
740 : else
741 193 : relentry->schema_sent = true;
742 : }
743 :
1096 peter 744 ECB : /*
745 : * Sends a relation
746 : */
747 : static void
948 akapila 748 GIC 282 : send_relation_and_attrs(Relation relation, TransactionId xid,
749 : LogicalDecodingContext *ctx,
750 : Bitmapset *columns)
751 : {
1096 peter 752 CBC 282 : TupleDesc desc = RelationGetDescr(relation);
753 : int i;
1828 peter_e 754 ECB :
755 : /*
1096 peter 756 : * Write out type info if needed. We do that only for user-created types.
757 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
758 : * objects with hand-assigned OIDs to be "built in", not for instance any
759 : * function or type defined in the information_schema. This is important
760 : * because only hand-assigned OIDs can be expected to remain stable across
761 : * major versions.
762 : */
1096 peter 763 CBC 857 : for (i = 0; i < desc->natts; i++)
764 : {
765 575 : Form_pg_attribute att = TupleDescAttr(desc, i);
766 :
1096 peter 767 GIC 575 : if (att->attisdropped || att->attgenerated)
768 5 : continue;
769 :
770 570 : if (att->atttypid < FirstGenbkiObjectId)
771 552 : continue;
1828 peter_e 772 ECB :
773 : /* Skip this attribute if it's not present in the column list */
379 tomas.vondra 774 GIC 18 : if (columns != NULL && !bms_is_member(att->attnum, columns))
379 tomas.vondra 775 UIC 0 : continue;
379 tomas.vondra 776 ECB :
1828 peter_e 777 GIC 18 : OutputPluginPrepareWrite(ctx, false);
948 akapila 778 18 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
1828 peter_e 779 18 : OutputPluginWrite(ctx, false);
780 : }
781 :
1096 peter 782 282 : OutputPluginPrepareWrite(ctx, false);
379 tomas.vondra 783 282 : logicalrep_write_rel(ctx->out, xid, relation, columns);
1096 peter 784 282 : OutputPluginWrite(ctx, false);
1828 peter_e 785 282 : }
786 :
411 akapila 787 ECB : /*
788 : * Executor state preparation for evaluation of row filter expressions for the
789 : * specified relation.
790 : */
791 : static EState *
411 akapila 792 CBC 16 : create_estate_for_relation(Relation rel)
793 : {
411 akapila 794 ECB : EState *estate;
795 : RangeTblEntry *rte;
34 tgl 796 GNC 16 : List *perminfos = NIL;
797 :
411 akapila 798 GIC 16 : estate = CreateExecutorState();
411 akapila 799 ECB :
411 akapila 800 GBC 16 : rte = makeNode(RangeTblEntry);
411 akapila 801 GIC 16 : rte->rtekind = RTE_RELATION;
411 akapila 802 CBC 16 : rte->relid = RelationGetRelid(rel);
803 16 : rte->relkind = rel->rd_rel->relkind;
804 16 : rte->rellockmode = AccessShareLock;
805 :
34 tgl 806 GNC 16 : addRTEPermissionInfo(&perminfos, rte);
807 :
808 16 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
809 :
411 akapila 810 CBC 16 : estate->es_output_cid = GetCurrentCommandId(false);
411 akapila 811 ECB :
411 akapila 812 CBC 16 : return estate;
411 akapila 813 ECB : }
814 :
815 : /*
816 : * Evaluates row filter.
817 : *
818 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
819 : * isn't replicated.
820 : */
821 : static bool
411 akapila 822 GIC 36 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
823 : {
411 akapila 824 ECB : Datum ret;
825 : bool isnull;
826 :
411 akapila 827 GIC 36 : Assert(state != NULL);
411 akapila 828 ECB :
411 akapila 829 CBC 36 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
411 akapila 830 ECB :
411 akapila 831 CBC 36 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
411 akapila 832 ECB : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
833 : isnull ? "true" : "false");
834 :
411 akapila 835 GIC 36 : if (isnull)
411 akapila 836 CBC 1 : return false;
837 :
838 35 : return DatumGetBool(ret);
839 : }
411 akapila 840 ECB :
841 : /*
842 : * Make sure the per-entry memory context exists.
843 : */
844 : static void
379 tomas.vondra 845 GIC 53 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
846 : {
847 : Relation relation;
848 :
849 : /* The context may already exist, in which case bail out. */
379 tomas.vondra 850 CBC 53 : if (entry->entry_cxt)
379 tomas.vondra 851 GIC 2 : return;
852 :
853 51 : relation = RelationIdGetRelation(entry->publish_as_relid);
854 :
379 tomas.vondra 855 CBC 51 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
856 : "entry private context",
379 tomas.vondra 857 ECB : ALLOCSET_SMALL_SIZES);
858 :
379 tomas.vondra 859 CBC 51 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
860 : RelationGetRelationName(relation));
861 : }
862 :
411 akapila 863 ECB : /*
864 : * Initialize the row filter.
865 : */
866 : static void
411 akapila 867 GIC 222 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
868 : RelationSyncEntry *entry)
869 : {
870 : ListCell *lc;
871 222 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
872 222 : bool no_filter[] = {false, false, false}; /* One per pubaction */
411 akapila 873 ECB : MemoryContext oldctx;
874 : int idx;
411 akapila 875 GIC 222 : bool has_filter = true;
198 876 222 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
877 :
411 akapila 878 ECB : /*
879 : * Find if there are any row filters for this relation. If there are, then
880 : * prepare the necessary ExprState and cache it in entry->exprstate. To
881 : * build an expression state, we need to ensure the following:
882 : *
883 : * All the given publication-table mappings must be checked.
884 : *
885 : * Multiple publications might have multiple row filters for this
886 : * relation. Since row filter usage depends on the DML operation, there
887 : * are multiple lists (one for each operation) to which row filters will
888 : * be appended.
889 : *
890 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row
891 : * filter expression" so it takes precedence.
892 : */
411 akapila 893 GIC 242 : foreach(lc, publications)
894 : {
411 akapila 895 CBC 226 : Publication *pub = lfirst(lc);
411 akapila 896 GIC 226 : HeapTuple rftuple = NULL;
897 226 : Datum rfdatum = 0;
198 898 226 : bool pub_no_filter = true;
411 akapila 899 ECB :
198 900 : /*
901 : * If the publication is FOR ALL TABLES, or the publication includes a
902 : * FOR TABLES IN SCHEMA where the table belongs to the referred
903 : * schema, then it is treated the same as if there are no row filters
904 : * (even if other publications have a row filter).
905 : */
198 akapila 906 GIC 226 : if (!pub->alltables &&
907 166 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
908 : ObjectIdGetDatum(schemaid),
909 : ObjectIdGetDatum(pub->oid)))
910 : {
911 : /*
912 : * Check for the presence of a row filter in this publication.
913 : */
411 914 160 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
915 : ObjectIdGetDatum(entry->publish_as_relid),
916 : ObjectIdGetDatum(pub->oid));
917 :
918 160 : if (HeapTupleIsValid(rftuple))
919 : {
920 : /* Null indicates no filter. */
411 akapila 921 CBC 148 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
922 : Anum_pg_publication_rel_prqual,
411 akapila 923 ECB : &pub_no_filter);
924 : }
925 : }
926 :
411 akapila 927 GIC 226 : if (pub_no_filter)
928 : {
929 213 : if (rftuple)
930 135 : ReleaseSysCache(rftuple);
931 :
932 213 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
933 213 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
411 akapila 934 CBC 213 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
411 akapila 935 ECB :
936 : /*
937 : * Quick exit if all the DML actions are publicized via this
938 : * publication.
939 : */
411 akapila 940 GIC 213 : if (no_filter[PUBACTION_INSERT] &&
941 213 : no_filter[PUBACTION_UPDATE] &&
411 akapila 942 CBC 206 : no_filter[PUBACTION_DELETE])
943 : {
411 akapila 944 GIC 206 : has_filter = false;
945 206 : break;
411 akapila 946 ECB : }
947 :
948 : /* No additional work for this publication. Next one. */
411 akapila 949 CBC 7 : continue;
950 : }
951 :
952 : /* Form the per pubaction row filter lists. */
411 akapila 953 GIC 13 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
954 13 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
411 akapila 955 CBC 13 : TextDatumGetCString(rfdatum));
411 akapila 956 GIC 13 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
411 akapila 957 CBC 13 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
958 13 : TextDatumGetCString(rfdatum));
411 akapila 959 GIC 13 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
411 akapila 960 CBC 13 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
961 13 : TextDatumGetCString(rfdatum));
411 akapila 962 ECB :
411 akapila 963 GIC 13 : ReleaseSysCache(rftuple);
964 : } /* loop all subscribed publications */
965 :
966 : /* Clean the row filter */
967 888 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
411 akapila 968 ECB : {
411 akapila 969 CBC 666 : if (no_filter[idx])
411 akapila 970 ECB : {
411 akapila 971 GIC 627 : list_free_deep(rfnodes[idx]);
411 akapila 972 CBC 627 : rfnodes[idx] = NIL;
411 akapila 973 ECB : }
974 : }
975 :
411 akapila 976 GIC 222 : if (has_filter)
411 akapila 977 ECB : {
411 akapila 978 GIC 16 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
979 :
379 tomas.vondra 980 16 : pgoutput_ensure_entry_cxt(data, entry);
411 akapila 981 ECB :
982 : /*
983 : * Now all the filters for all pubactions are known. Combine them when
984 : * their pubactions are the same.
985 : */
379 tomas.vondra 986 CBC 16 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
411 akapila 987 16 : entry->estate = create_estate_for_relation(relation);
988 64 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
411 akapila 989 ECB : {
411 akapila 990 GIC 48 : List *filters = NIL;
411 akapila 991 ECB : Expr *rfnode;
992 :
411 akapila 993 GIC 48 : if (rfnodes[idx] == NIL)
994 21 : continue;
411 akapila 995 ECB :
411 akapila 996 GIC 57 : foreach(lc, rfnodes[idx])
411 akapila 997 CBC 30 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
998 :
411 akapila 999 ECB : /* combine the row filter and cache the ExprState */
411 akapila 1000 CBC 27 : rfnode = make_orclause(filters);
411 akapila 1001 GIC 27 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1002 : } /* for each pubaction */
1003 16 : MemoryContextSwitchTo(oldctx);
411 akapila 1004 ECB :
411 akapila 1005 GIC 16 : RelationClose(relation);
411 akapila 1006 ECB : }
411 akapila 1007 GIC 222 : }
411 akapila 1008 ECB :
1009 : /*
1010 : * Initialize the column list.
1011 : */
1012 : static void
379 tomas.vondra 1013 GIC 222 : pgoutput_column_list_init(PGOutputData *data, List *publications,
379 tomas.vondra 1014 ECB : RelationSyncEntry *entry)
1015 : {
1016 : ListCell *lc;
311 akapila 1017 GIC 222 : bool first = true;
311 akapila 1018 CBC 222 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1019 :
1020 : /*
379 tomas.vondra 1021 ECB : * Find if there are any column lists for this relation. If there are,
311 akapila 1022 : * build a bitmap using the column lists.
1023 : *
332 tgl 1024 : * Multiple publications might have multiple column lists for this
1025 : * relation.
1026 : *
1027 : * Note that we don't support the case where the column list is different
311 akapila 1028 : * for the same table when combining publications. See comments atop
1029 : * fetch_table_list. But one can later change the publication so we still
1030 : * need to check all the given publication-table mappings and report an
1031 : * error if any publications have a different column list.
1032 : *
199 alvherre 1033 : * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1034 : */
379 tomas.vondra 1035 CBC 454 : foreach(lc, publications)
1036 : {
379 tomas.vondra 1037 GIC 233 : Publication *pub = lfirst(lc);
1038 233 : HeapTuple cftuple = NULL;
1039 233 : Datum cfdatum = 0;
311 akapila 1040 233 : Bitmapset *cols = NULL;
379 tomas.vondra 1041 ECB :
1042 : /*
1043 : * If the publication is FOR ALL TABLES then it is treated the same as
1044 : * if there are no column lists (even if other publications have a
332 tgl 1045 : * list).
379 tomas.vondra 1046 : */
379 tomas.vondra 1047 GIC 233 : if (!pub->alltables)
1048 : {
311 akapila 1049 172 : bool pub_no_list = true;
1050 :
1051 : /*
1052 : * Check for the presence of a column list in this publication.
1053 : *
1054 : * Note: If we find no pg_publication_rel row, it's a publication
1055 : * defined for a whole schema, so it can't have a column list,
1056 : * just like a FOR ALL TABLES publication.
1057 : */
379 tomas.vondra 1058 172 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1059 : ObjectIdGetDatum(entry->publish_as_relid),
1060 : ObjectIdGetDatum(pub->oid));
1061 :
1062 172 : if (HeapTupleIsValid(cftuple))
379 tomas.vondra 1063 ECB : {
1064 : /* Lookup the column list attribute. */
379 tomas.vondra 1065 CBC 153 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
379 tomas.vondra 1066 ECB : Anum_pg_publication_rel_prattrs,
1067 : &pub_no_list);
1068 :
1069 : /* Build the column list bitmap in the per-entry context. */
379 tomas.vondra 1070 GIC 153 : if (!pub_no_list) /* when not null */
1071 : {
1072 : int i;
86 akapila 1073 GNC 37 : int nliveatts = 0;
1074 37 : TupleDesc desc = RelationGetDescr(relation);
1075 :
379 tomas.vondra 1076 GIC 37 : pgoutput_ensure_entry_cxt(data, entry);
1077 :
311 akapila 1078 37 : cols = pub_collist_to_bitmapset(cols, cfdatum,
311 akapila 1079 ECB : entry->entry_cxt);
1080 :
1081 : /* Get the number of live attributes. */
86 akapila 1082 GNC 155 : for (i = 0; i < desc->natts; i++)
1083 : {
1084 118 : Form_pg_attribute att = TupleDescAttr(desc, i);
1085 :
1086 118 : if (att->attisdropped || att->attgenerated)
1087 2 : continue;
1088 :
1089 116 : nliveatts++;
1090 : }
1091 :
311 akapila 1092 ECB : /*
1093 : * If column list includes all the columns of the table,
1094 : * set it to NULL.
1095 : */
86 akapila 1096 GNC 37 : if (bms_num_members(cols) == nliveatts)
1097 : {
311 akapila 1098 GIC 7 : bms_free(cols);
1099 7 : cols = NULL;
1100 : }
379 tomas.vondra 1101 ECB : }
1102 :
311 akapila 1103 GIC 153 : ReleaseSysCache(cftuple);
1104 : }
379 tomas.vondra 1105 ECB : }
1106 :
311 akapila 1107 GIC 233 : if (first)
379 tomas.vondra 1108 ECB : {
311 akapila 1109 GIC 222 : entry->columns = cols;
1110 222 : first = false;
1111 : }
1112 11 : else if (!bms_equal(entry->columns, cols))
311 akapila 1113 CBC 1 : ereport(ERROR,
1114 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1115 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
311 akapila 1116 ECB : get_namespace_name(RelationGetNamespace(relation)),
1117 : RelationGetRelationName(relation)));
1118 : } /* loop all subscribed publications */
1119 :
311 akapila 1120 GIC 221 : RelationClose(relation);
379 tomas.vondra 1121 CBC 221 : }
1122 :
1123 : /*
1124 : * Initialize the slot for storing new and old tuples, and build the map that
411 akapila 1125 ECB : * will be used to convert the relation's tuples into the ancestor's format.
1126 : */
1127 : static void
411 akapila 1128 GIC 222 : init_tuple_slot(PGOutputData *data, Relation relation,
411 akapila 1129 ECB : RelationSyncEntry *entry)
1130 : {
1131 : MemoryContext oldctx;
1132 : TupleDesc oldtupdesc;
1133 : TupleDesc newtupdesc;
1134 :
411 akapila 1135 GIC 222 : oldctx = MemoryContextSwitchTo(data->cachectx);
1136 :
1137 : /*
1138 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
411 akapila 1139 ECB : * live as long as the cache remains.
1140 : */
411 akapila 1141 CBC 222 : oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1142 222 : newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1143 :
411 akapila 1144 GIC 222 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1145 222 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
411 akapila 1146 ECB :
411 akapila 1147 GIC 222 : MemoryContextSwitchTo(oldctx);
1148 :
1149 : /*
411 akapila 1150 ECB : * Cache the map that will be used to convert the relation's tuples into
1151 : * the ancestor's format, if needed.
1152 : */
411 akapila 1153 CBC 222 : if (entry->publish_as_relid != RelationGetRelid(relation))
1154 : {
1155 26 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1156 26 : TupleDesc indesc = RelationGetDescr(relation);
411 akapila 1157 GIC 26 : TupleDesc outdesc = RelationGetDescr(ancestor);
1158 :
1159 : /* Map must live as long as the session does. */
1160 26 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1161 :
131 alvherre 1162 GNC 26 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
411 akapila 1163 ECB :
411 akapila 1164 CBC 26 : MemoryContextSwitchTo(oldctx);
411 akapila 1165 GIC 26 : RelationClose(ancestor);
1166 : }
1167 222 : }
1168 :
1169 : /*
1170 : * Change is checked against the row filter if any.
411 akapila 1171 ECB : *
1172 : * Returns true if the change is to be replicated, else false.
1173 : *
1174 : * For inserts, evaluate the row filter for new tuple.
1175 : * For deletes, evaluate the row filter for old tuple.
1176 : * For updates, evaluate the row filter for old and new tuple.
1177 : *
1178 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1179 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1180 : * only one of the tuples matches the row filter expression, we transform
1181 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1182 : * following rules:
1183 : *
1184 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1185 : * Case 2: old-row (no match) new row (match) -> INSERT
1186 : * Case 3: old-row (match) new-row (no match) -> DELETE
1187 : * Case 4: old-row (match) new row (match) -> UPDATE
1188 : *
1189 : * The new action is updated in the action parameter.
1190 : *
1191 : * The new slot could be updated when transforming the UPDATE into INSERT,
1192 : * because the original new tuple might not have column values from the replica
1193 : * identity.
1194 : *
1195 : * Examples:
1196 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1197 : * Since the old tuple satisfies, the initial table synchronization copied this
1198 : * row (or another method was used to guarantee that there is data
1199 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1200 : * row filter, so from a data consistency perspective, that row should be
1201 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1202 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1203 : * is undesirable because it doesn't reflect what was defined in the row filter
1204 : * expression on the publisher. This row on the subscriber would likely not be
1205 : * modified by replication again. If someone inserted a new row with the same
1206 : * old identifier, replication could stop due to a constraint violation.
1207 : *
1208 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1209 : * Since the old tuple doesn't satisfy, the initial table synchronization
1210 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1211 : * satisfy the row filter, so from a data consistency perspective, that row
1212 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1213 : * statements have no effect (it matches no row -- see
1214 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1215 : * INSERT statement and be sent to the subscriber. However, this might surprise
1216 : * someone who expects the data set to satisfy the row filter expression on the
1217 : * provider.
1218 : */
1219 : static bool
411 akapila 1220 GIC 181949 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1221 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1222 : ReorderBufferChangeType *action)
1223 : {
1224 : TupleDesc desc;
1225 : int i;
1226 : bool old_matched,
1227 : new_matched,
1228 : result;
1229 : TupleTableSlot *tmp_new_slot;
1230 181949 : TupleTableSlot *new_slot = *new_slot_ptr;
1231 : ExprContext *ecxt;
1232 : ExprState *filter_exprstate;
1233 :
1234 : /*
1235 : * We need this map to avoid relying on ReorderBufferChangeType enums
1236 : * having specific values.
1237 : */
1238 : static const int map_changetype_pubaction[] = {
1239 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1240 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1241 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1242 : };
1243 :
1244 181949 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1245 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1246 : *action == REORDER_BUFFER_CHANGE_DELETE);
1247 :
1248 181949 : Assert(new_slot || old_slot);
1249 :
1250 : /* Get the corresponding row filter */
1251 181949 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1252 :
1253 : /* Bail out if there is no row filter */
1254 181949 : if (!filter_exprstate)
1255 181917 : return true;
1256 :
1257 32 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1258 : get_namespace_name(RelationGetNamespace(relation)),
1259 : RelationGetRelationName(relation));
1260 :
1261 32 : ResetPerTupleExprContext(entry->estate);
1262 :
411 akapila 1263 CBC 32 : ecxt = GetPerTupleExprContext(entry->estate);
1264 :
1265 : /*
1266 : * For the following occasions where there is only one tuple, we can
1267 : * evaluate the row filter for that tuple and return.
1268 : *
1269 : * For inserts, we only have the new tuple.
1270 : *
1271 : * For updates, we can have only a new tuple when none of the replica
1272 : * identity columns changed and none of those columns have external data
368 akapila 1273 ECB : * but we still need to evaluate the row filter for the new tuple as the
1274 : * existing values of those columns might not match the filter. Also,
1275 : * users can use constant expressions in the row filter, so we anyway need
1276 : * to evaluate it for the new tuple.
1277 : *
1278 : * For deletes, we only have the old tuple.
1279 : */
411 akapila 1280 GIC 32 : if (!new_slot || !old_slot)
1281 : {
1282 28 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1283 28 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1284 :
1285 28 : return result;
1286 : }
411 akapila 1287 ECB :
1288 : /*
1289 : * Both the old and new tuples must be valid only for updates and need to
1290 : * be checked against the row filter.
1291 : */
411 akapila 1292 GIC 4 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1293 :
411 akapila 1294 CBC 4 : slot_getallattrs(new_slot);
411 akapila 1295 GIC 4 : slot_getallattrs(old_slot);
1296 :
411 akapila 1297 CBC 4 : tmp_new_slot = NULL;
1298 4 : desc = RelationGetDescr(relation);
1299 :
411 akapila 1300 ECB : /*
1301 : * The new tuple might not have all the replica identity columns, in which
1302 : * case it needs to be copied over from the old tuple.
1303 : */
411 akapila 1304 CBC 12 : for (i = 0; i < desc->natts; i++)
1305 : {
1306 8 : Form_pg_attribute att = TupleDescAttr(desc, i);
1307 :
1308 : /*
1309 : * if the column in the new tuple or old tuple is null, nothing to do
1310 : */
411 akapila 1311 GIC 8 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1312 1 : continue;
1313 :
1314 : /*
1315 : * Unchanged toasted replica identity columns are only logged in the
1316 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1317 : * Logged) toast values are always assembled in memory and set as
1318 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1319 : */
1320 7 : if (att->attlen == -1 &&
1321 4 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1322 1 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
411 akapila 1323 ECB : {
411 akapila 1324 GIC 1 : if (!tmp_new_slot)
411 akapila 1325 ECB : {
411 akapila 1326 CBC 1 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
411 akapila 1327 GIC 1 : ExecClearTuple(tmp_new_slot);
411 akapila 1328 ECB :
411 akapila 1329 GIC 1 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1330 1 : desc->natts * sizeof(Datum));
1331 1 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1332 1 : desc->natts * sizeof(bool));
1333 : }
1334 :
411 akapila 1335 CBC 1 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
411 akapila 1336 GIC 1 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
411 akapila 1337 ECB : }
1338 : }
1339 :
411 akapila 1340 CBC 4 : ecxt->ecxt_scantuple = old_slot;
1341 4 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1342 :
411 akapila 1343 GIC 4 : if (tmp_new_slot)
1344 : {
1345 1 : ExecStoreVirtualTuple(tmp_new_slot);
1346 1 : ecxt->ecxt_scantuple = tmp_new_slot;
411 akapila 1347 ECB : }
1348 : else
411 akapila 1349 CBC 3 : ecxt->ecxt_scantuple = new_slot;
1350 :
411 akapila 1351 GIC 4 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1352 :
1353 : /*
411 akapila 1354 ECB : * Case 1: if both tuples don't match the row filter, bailout. Send
1355 : * nothing.
1356 : */
411 akapila 1357 GIC 4 : if (!old_matched && !new_matched)
411 akapila 1358 UIC 0 : return false;
1359 :
1360 : /*
1361 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1362 : * tuple does, transform the UPDATE into INSERT.
411 akapila 1363 ECB : *
1364 : * Use the newly transformed tuple that must contain the column values for
1365 : * all the replica identity columns. This is required to ensure that the
1366 : * while inserting the tuple in the downstream node, we have all the
1367 : * required column values.
1368 : */
411 akapila 1369 CBC 4 : if (!old_matched && new_matched)
411 akapila 1370 ECB : {
411 akapila 1371 GIC 2 : *action = REORDER_BUFFER_CHANGE_INSERT;
411 akapila 1372 ECB :
411 akapila 1373 CBC 2 : if (tmp_new_slot)
1374 1 : *new_slot_ptr = tmp_new_slot;
411 akapila 1375 ECB : }
1376 :
1377 : /*
1378 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1379 : * doesn't, transform the UPDATE into DELETE.
1380 : *
1381 : * This transformation does not require another tuple. The Old tuple will
1382 : * be used for DELETE.
1383 : */
411 akapila 1384 CBC 2 : else if (old_matched && !new_matched)
411 akapila 1385 GIC 1 : *action = REORDER_BUFFER_CHANGE_DELETE;
411 akapila 1386 ECB :
1387 : /*
1388 : * Case 4: if both tuples match the row filter, transformation isn't
1389 : * required. (*action is default UPDATE).
1390 : */
1391 :
411 akapila 1392 CBC 4 : return true;
1393 : }
411 akapila 1394 ECB :
1395 : /*
1396 : * Sends the decoded DML over wire.
1397 : *
1398 : * This is called both in streaming and non-streaming modes.
1399 : */
2271 peter_e 1400 : static void
2271 peter_e 1401 GBC 183090 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1402 : Relation relation, ReorderBufferChange *change)
1403 : {
2153 bruce 1404 GIC 183090 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
375 akapila 1405 183090 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1406 : MemoryContext old;
1407 : RelationSyncEntry *relentry;
948 1408 183090 : TransactionId xid = InvalidTransactionId;
817 1409 183090 : Relation ancestor = NULL;
411 1410 183090 : Relation targetrel = relation;
1411 183090 : ReorderBufferChangeType action = change->action;
411 akapila 1412 CBC 183090 : TupleTableSlot *old_slot = NULL;
411 akapila 1413 GIC 183090 : TupleTableSlot *new_slot = NULL;
2271 peter_e 1414 ECB :
1871 peter_e 1415 CBC 183090 : if (!is_publishable_relation(relation))
1871 peter_e 1416 GIC 1138 : return;
1417 :
1418 : /*
1419 : * Remember the xid for the change in streaming mode. We need to send xid
1420 : * with each change in the streaming mode so that subscriber can make
1421 : * their association and on aborts, it can discard the corresponding
1422 : * changes.
1423 : */
948 akapila 1424 183088 : if (in_streaming)
948 akapila 1425 CBC 175901 : xid = change->txn->xid;
948 akapila 1426 ECB :
411 akapila 1427 GIC 183088 : relentry = get_rel_sync_entry(data, relation);
1428 :
1429 : /* First check the table filter */
1430 183085 : switch (action)
1431 : {
2271 peter_e 1432 105738 : case REORDER_BUFFER_CHANGE_INSERT:
2271 peter_e 1433 CBC 105738 : if (!relentry->pubactions.pubinsert)
2271 peter_e 1434 GIC 41 : return;
1435 105697 : break;
1436 34446 : case REORDER_BUFFER_CHANGE_UPDATE:
1437 34446 : if (!relentry->pubactions.pubupdate)
1438 41 : return;
1439 34405 : break;
1440 42901 : case REORDER_BUFFER_CHANGE_DELETE:
1441 42901 : if (!relentry->pubactions.pubdelete)
2271 peter_e 1442 CBC 1054 : return;
1443 :
1444 : /*
1445 : * This is only possible if deletes are allowed even when replica
1446 : * identity is not defined for a table. Since the DELETE action
1447 : * can't be published, we simply return.
1448 : */
10 akapila 1449 GNC 41847 : if (!change->data.tp.oldtuple)
1450 : {
10 akapila 1451 UNC 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1452 0 : return;
1453 : }
2271 peter_e 1454 GIC 41847 : break;
2271 peter_e 1455 UIC 0 : default:
2271 peter_e 1456 LBC 0 : Assert(false);
2271 peter_e 1457 ECB : }
1458 :
1459 : /* Avoid leaking memory by using and resetting our own context */
2271 peter_e 1460 CBC 181949 : old = MemoryContextSwitchTo(data->context);
2271 peter_e 1461 ECB :
1462 : /* Switch relation if publishing via root. */
10 akapila 1463 GNC 181949 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1464 : {
1465 47 : Assert(relation->rd_rel->relispartition);
1466 47 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1467 47 : targetrel = ancestor;
1468 : }
1469 :
1470 181949 : if (change->data.tp.oldtuple)
1471 : {
1472 41957 : old_slot = relentry->old_slot;
1473 41957 : ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
1474 :
1475 : /* Convert tuple if needed. */
1476 41957 : if (relentry->attrmap)
1477 : {
10 akapila 1478 UNC 0 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1479 : &TTSOpsVirtual);
1480 :
1481 0 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1482 : }
1483 : }
1484 :
10 akapila 1485 GNC 181949 : if (change->data.tp.newtuple)
1486 : {
1487 140102 : new_slot = relentry->new_slot;
1488 140102 : ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
1489 :
1490 : /* Convert tuple if needed. */
1491 140102 : if (relentry->attrmap)
1492 : {
1493 8 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1494 : &TTSOpsVirtual);
1495 :
1496 8 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1497 : }
1498 : }
1499 :
1500 : /*
1501 : * Check row filter.
1502 : *
1503 : * Updates could be transformed to inserts or deletes based on the results
1504 : * of the row filter for old and new tuple.
1505 : */
1506 181949 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1507 11 : goto cleanup;
1508 :
1509 : /*
1510 : * Send BEGIN if we haven't yet.
1511 : *
1512 : * We send the BEGIN message after ensuring that we will actually send the
1513 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1514 : * transactions.
1515 : */
1516 181938 : if (txndata && !txndata->sent_begin_txn)
1517 326 : pgoutput_send_begin(ctx, txn);
1518 :
1519 : /*
1520 : * Schema should be sent using the original relation because it also sends
1521 : * the ancestor's relation.
1522 : */
1523 181938 : maybe_send_schema(ctx, change, relation, relentry);
1524 :
1525 181938 : OutputPluginPrepareWrite(ctx, true);
1526 :
10 akapila 1527 ECB : /* Send the data */
10 akapila 1528 CBC 181938 : switch (action)
10 akapila 1529 ECB : {
10 akapila 1530 CBC 105688 : case REORDER_BUFFER_CHANGE_INSERT:
10 akapila 1531 GIC 105688 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1532 105688 : data->binary, relentry->columns);
10 akapila 1533 CBC 105688 : break;
10 akapila 1534 GIC 34402 : case REORDER_BUFFER_CHANGE_UPDATE:
10 akapila 1535 GNC 34402 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1536 34402 : new_slot, data->binary, relentry->columns);
411 akapila 1537 CBC 34402 : break;
2271 peter_e 1538 41848 : case REORDER_BUFFER_CHANGE_DELETE:
10 akapila 1539 GNC 41848 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1540 41848 : data->binary, relentry->columns);
2271 peter_e 1541 CBC 41848 : break;
2271 peter_e 1542 UIC 0 : default:
2271 peter_e 1543 LBC 0 : Assert(false);
1544 : }
2271 peter_e 1545 ECB :
10 akapila 1546 GNC 181938 : OutputPluginWrite(ctx, true);
1547 :
1548 181948 : cleanup:
817 akapila 1549 CBC 181948 : if (RelationIsValid(ancestor))
1550 : {
1551 47 : RelationClose(ancestor);
817 akapila 1552 GBC 47 : ancestor = NULL;
1553 : }
817 akapila 1554 ECB :
2271 peter_e 1555 CBC 181948 : MemoryContextSwitchTo(old);
1556 181948 : MemoryContextReset(data->context);
1557 : }
1558 :
1559 : static void
1828 peter_e 1560 GIC 11 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1561 : int nrelations, Relation relations[], ReorderBufferChange *change)
1828 peter_e 1562 ECB : {
1828 peter_e 1563 CBC 11 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
375 akapila 1564 GBC 11 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1565 : MemoryContext old;
1828 peter_e 1566 ECB : RelationSyncEntry *relentry;
1567 : int i;
1568 : int nrelids;
1569 : Oid *relids;
948 akapila 1570 CBC 11 : TransactionId xid = InvalidTransactionId;
1571 :
1572 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1573 11 : if (in_streaming)
948 akapila 1574 UIC 0 : xid = change->txn->xid;
1828 peter_e 1575 ECB :
1828 peter_e 1576 CBC 11 : old = MemoryContextSwitchTo(data->context);
1577 :
1828 peter_e 1578 GIC 11 : relids = palloc0(nrelations * sizeof(Oid));
1579 11 : nrelids = 0;
1828 peter_e 1580 ECB :
1828 peter_e 1581 CBC 31 : for (i = 0; i < nrelations; i++)
1828 peter_e 1582 ECB : {
1828 peter_e 1583 GIC 20 : Relation relation = relations[i];
1584 20 : Oid relid = RelationGetRelid(relation);
1828 peter_e 1585 ECB :
1828 peter_e 1586 CBC 20 : if (!is_publishable_relation(relation))
1828 peter_e 1587 LBC 0 : continue;
1588 :
411 akapila 1589 GIC 20 : relentry = get_rel_sync_entry(data, relation);
1828 peter_e 1590 ECB :
1828 peter_e 1591 GIC 20 : if (!relentry->pubactions.pubtruncate)
1592 8 : continue;
1593 :
1125 peter 1594 ECB : /*
1096 1595 : * Don't send partitions if the publication wants to send only the
1596 : * root tables through it.
1125 1597 : */
1096 peter 1598 CBC 12 : if (relation->rd_rel->relispartition &&
1096 peter 1599 GIC 10 : relentry->publish_as_relid != relid)
1125 peter 1600 UIC 0 : continue;
1601 :
1828 peter_e 1602 GIC 12 : relids[nrelids++] = relid;
1603 :
375 akapila 1604 ECB : /* Send BEGIN if we haven't yet */
375 akapila 1605 GBC 12 : if (txndata && !txndata->sent_begin_txn)
375 akapila 1606 GIC 7 : pgoutput_send_begin(ctx, txn);
1607 :
612 fujii 1608 12 : maybe_send_schema(ctx, change, relation, relentry);
1609 : }
1828 peter_e 1610 ECB :
1805 peter_e 1611 GIC 11 : if (nrelids > 0)
1805 peter_e 1612 ECB : {
1805 peter_e 1613 GIC 7 : OutputPluginPrepareWrite(ctx, true);
1614 7 : logicalrep_write_truncate(ctx->out,
948 akapila 1615 ECB : xid,
1805 peter_e 1616 : nrelids,
1617 : relids,
1805 peter_e 1618 GIC 7 : change->data.truncate.cascade,
1805 peter_e 1619 CBC 7 : change->data.truncate.restart_seqs);
1620 7 : OutputPluginWrite(ctx, true);
1621 : }
1622 :
1828 peter_e 1623 GIC 11 : MemoryContextSwitchTo(old);
1624 11 : MemoryContextReset(data->context);
1625 11 : }
1626 :
733 akapila 1627 ECB : static void
733 akapila 1628 GIC 6 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1629 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1630 : const char *message)
1631 : {
1632 6 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1633 6 : TransactionId xid = InvalidTransactionId;
1634 :
1635 6 : if (!data->messages)
733 akapila 1636 CBC 1 : return;
733 akapila 1637 ECB :
1638 : /*
1639 : * Remember the xid for the message in streaming mode. See
1640 : * pgoutput_change.
1641 : */
733 akapila 1642 GIC 5 : if (in_streaming)
733 akapila 1643 UIC 0 : xid = txn->xid;
1644 :
1645 : /*
1646 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1647 : */
375 akapila 1648 GIC 5 : if (transactional)
1649 : {
375 akapila 1650 CBC 2 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1651 :
375 akapila 1652 ECB : /* Send BEGIN if we haven't yet */
375 akapila 1653 GIC 2 : if (txndata && !txndata->sent_begin_txn)
375 akapila 1654 CBC 2 : pgoutput_send_begin(ctx, txn);
375 akapila 1655 ECB : }
1656 :
733 akapila 1657 CBC 5 : OutputPluginPrepareWrite(ctx, true);
733 akapila 1658 GIC 5 : logicalrep_write_message(ctx->out,
1659 : xid,
1660 : message_lsn,
1661 : transactional,
1662 : prefix,
733 akapila 1663 ECB : sz,
1664 : message);
733 akapila 1665 CBC 5 : OutputPluginWrite(ctx, true);
1666 : }
1667 :
2271 peter_e 1668 ECB : /*
1669 : * Return true if the data is associated with an origin and the user has
1670 : * requested the changes that don't have an origin, false otherwise.
1671 : */
1672 : static bool
2271 peter_e 1673 GIC 490622 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
2153 bruce 1674 ECB : RepOriginId origin_id)
1675 : {
262 akapila 1676 GNC 490622 : if (publish_no_origin && origin_id != InvalidRepOriginId)
1677 46 : return true;
1678 :
2271 peter_e 1679 GIC 490576 : return false;
2271 peter_e 1680 ECB : }
1681 :
1682 : /*
1683 : * Shutdown the output plugin.
1684 : *
1685 : * Note, we don't need to clean the data->context and data->cachectx as
1686 : * they are child contexts of the ctx->context so they will be cleaned up by
1687 : * logical decoding machinery.
1688 : */
1689 : static void
2153 bruce 1690 GIC 384 : pgoutput_shutdown(LogicalDecodingContext *ctx)
2271 peter_e 1691 ECB : {
2271 peter_e 1692 GIC 384 : if (RelationSyncCache)
1693 : {
1694 154 : hash_destroy(RelationSyncCache);
1695 154 : RelationSyncCache = NULL;
1696 : }
2271 peter_e 1697 CBC 384 : }
2271 peter_e 1698 ECB :
1699 : /*
1700 : * Load publications from the list of publication names.
1701 : */
1702 : static List *
2271 peter_e 1703 GIC 128 : LoadPublications(List *pubnames)
2271 peter_e 1704 ECB : {
2271 peter_e 1705 GIC 128 : List *result = NIL;
1706 : ListCell *lc;
2271 peter_e 1707 ECB :
2153 bruce 1708 GIC 297 : foreach(lc, pubnames)
1709 : {
2153 bruce 1710 CBC 171 : char *pubname = (char *) lfirst(lc);
2153 bruce 1711 GIC 171 : Publication *pub = GetPublicationByName(pubname, false);
1712 :
2271 peter_e 1713 169 : result = lappend(result, pub);
1714 : }
1715 :
2271 peter_e 1716 CBC 126 : return result;
2271 peter_e 1717 ECB : }
1718 :
1719 : /*
429 akapila 1720 : * Publication syscache invalidation callback.
1721 : *
1722 : * Called for invalidations on pg_publication.
1723 : */
1724 : static void
2271 peter_e 1725 CBC 222 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1726 : {
2271 peter_e 1727 GIC 222 : publications_valid = false;
2271 peter_e 1728 ECB :
1729 : /*
1730 : * Also invalidate per-relation cache so that next time the filtering info
1731 : * is checked it will be updated with the new publication settings.
1732 : */
2271 peter_e 1733 GIC 222 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1734 222 : }
2271 peter_e 1735 ECB :
1736 : /*
1737 : * START STREAM callback
1738 : */
948 akapila 1739 : static void
948 akapila 1740 GIC 595 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
948 akapila 1741 ECB : ReorderBufferTXN *txn)
1742 : {
948 akapila 1743 CBC 595 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1744 :
1745 : /* we can't nest streaming of transactions */
1746 595 : Assert(!in_streaming);
948 akapila 1747 ECB :
1748 : /*
1749 : * If we already sent the first stream for this transaction then don't
1750 : * send the origin id in the subsequent streams.
1751 : */
948 akapila 1752 GIC 595 : if (rbtxn_is_streamed(txn))
1753 540 : send_replication_origin = false;
948 akapila 1754 ECB :
948 akapila 1755 GIC 595 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1756 595 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1757 :
634 1758 595 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
634 akapila 1759 ECB : send_replication_origin);
948 1760 :
948 akapila 1761 GIC 595 : OutputPluginWrite(ctx, true);
1762 :
1763 : /* we're streaming a chunk of transaction now */
1764 595 : in_streaming = true;
1765 595 : }
948 akapila 1766 ECB :
1767 : /*
1768 : * STOP STREAM callback
1769 : */
1770 : static void
948 akapila 1771 CBC 595 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1772 : ReorderBufferTXN *txn)
948 akapila 1773 ECB : {
1774 : /* we should be streaming a trasanction */
948 akapila 1775 GIC 595 : Assert(in_streaming);
1776 :
948 akapila 1777 CBC 595 : OutputPluginPrepareWrite(ctx, true);
948 akapila 1778 GIC 595 : logicalrep_write_stream_stop(ctx->out);
948 akapila 1779 CBC 595 : OutputPluginWrite(ctx, true);
948 akapila 1780 ECB :
1781 : /* we've stopped streaming a transaction */
948 akapila 1782 GIC 595 : in_streaming = false;
1783 595 : }
1784 :
1785 : /*
1786 : * Notify downstream to discard the streamed transaction (along with all
948 akapila 1787 ECB : * it's subtransactions, if it's a toplevel transaction).
1788 : */
1789 : static void
948 akapila 1790 GIC 26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1791 : ReorderBufferTXN *txn,
1792 : XLogRecPtr abort_lsn)
1793 : {
1794 : ReorderBufferTXN *toptxn;
90 akapila 1795 GNC 26 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1796 26 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
948 akapila 1797 ECB :
1798 : /*
1799 : * The abort should happen outside streaming block, even for streamed
1800 : * transactions. The transaction has to be marked as streamed, though.
1801 : */
948 akapila 1802 CBC 26 : Assert(!in_streaming);
948 akapila 1803 ECB :
1804 : /* determine the toplevel transaction */
23 akapila 1805 GNC 26 : toptxn = rbtxn_get_toptxn(txn);
948 akapila 1806 ECB :
948 akapila 1807 CBC 26 : Assert(rbtxn_is_streamed(toptxn));
1808 :
948 akapila 1809 GIC 26 : OutputPluginPrepareWrite(ctx, true);
90 akapila 1810 GNC 26 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1811 : txn->xact_time.abort_time, write_abort_info);
1812 :
948 akapila 1813 GIC 26 : OutputPluginWrite(ctx, true);
1814 :
1815 26 : cleanup_rel_sync_cache(toptxn->xid, false);
1816 26 : }
948 akapila 1817 ECB :
1818 : /*
1819 : * Notify downstream to apply the streamed transaction (along with all
1820 : * it's subtransactions).
1821 : */
1822 : static void
948 akapila 1823 CBC 44 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
948 akapila 1824 ECB : ReorderBufferTXN *txn,
1825 : XLogRecPtr commit_lsn)
1826 : {
1827 : /*
1828 : * The commit should happen outside streaming block, even for streamed
1829 : * transactions. The transaction has to be marked as streamed, though.
1830 : */
948 akapila 1831 GIC 44 : Assert(!in_streaming);
1832 44 : Assert(rbtxn_is_streamed(txn));
1833 :
60 akapila 1834 GNC 44 : OutputPluginUpdateProgress(ctx, false);
1835 :
948 akapila 1836 GIC 44 : OutputPluginPrepareWrite(ctx, true);
948 akapila 1837 CBC 44 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
948 akapila 1838 GIC 44 : OutputPluginWrite(ctx, true);
1839 :
1840 44 : cleanup_rel_sync_cache(txn->xid, true);
1841 44 : }
1842 :
613 akapila 1843 ECB : /*
613 akapila 1844 EUB : * PREPARE callback (for streaming two-phase commit).
1845 : *
1846 : * Notify the downstream to prepare the transaction.
613 akapila 1847 ECB : */
1848 : static void
613 akapila 1849 CBC 8 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1850 : ReorderBufferTXN *txn,
613 akapila 1851 ECB : XLogRecPtr prepare_lsn)
1852 : {
613 akapila 1853 GIC 8 : Assert(rbtxn_is_streamed(txn));
1854 :
60 akapila 1855 GNC 8 : OutputPluginUpdateProgress(ctx, false);
613 akapila 1856 GIC 8 : OutputPluginPrepareWrite(ctx, true);
1857 8 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
613 akapila 1858 CBC 8 : OutputPluginWrite(ctx, true);
613 akapila 1859 GBC 8 : }
1860 :
1861 : /*
2271 peter_e 1862 ECB : * Initialize the relation schema sync cache for a decoding session.
1863 : *
1864 : * The hash table is destroyed at the end of a decoding session. While
1865 : * relcache invalidations still exist and will still be invoked, they
1866 : * will just see the null hash table global and take no action.
1867 : */
1868 : static void
2271 peter_e 1869 GIC 281 : init_rel_sync_cache(MemoryContext cachectx)
1870 : {
1871 : HASHCTL ctl;
1872 : static bool relation_callbacks_registered = false;
1873 :
1874 : /* Nothing to do if hash table already exists */
1875 281 : if (RelationSyncCache != NULL)
2271 peter_e 1876 UIC 0 : return;
2271 peter_e 1877 ECB :
1878 : /* Make a new hash table for the cache */
2271 peter_e 1879 GIC 281 : ctl.keysize = sizeof(Oid);
2271 peter_e 1880 CBC 281 : ctl.entrysize = sizeof(RelationSyncEntry);
2271 peter_e 1881 GIC 281 : ctl.hcxt = cachectx;
1882 :
1883 281 : RelationSyncCache = hash_create("logical replication output relation cache",
2271 peter_e 1884 ECB : 128, &ctl,
1885 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1886 :
2271 peter_e 1887 GIC 281 : Assert(RelationSyncCache != NULL);
1888 :
1889 : /* No more to do if we already registered callbacks */
45 tgl 1890 281 : if (relation_callbacks_registered)
45 tgl 1891 LBC 0 : return;
1892 :
1893 : /* We must update the cache entry for a relation after a relcache flush */
2271 peter_e 1894 CBC 281 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1895 :
1896 : /*
1897 : * Flush all cache entries after a pg_namespace change, in case it was a
1898 : * schema rename affecting a relation being replicated.
1899 : */
93 tgl 1900 GNC 281 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1901 : rel_sync_cache_publication_cb,
1902 : (Datum) 0);
1903 :
1904 : /*
1905 : * Flush all cache entries after any publication changes. (We need no
1906 : * callback entry for pg_publication, because publication_invalidation_cb
1907 : * will take care of it.)
1908 : */
2271 peter_e 1909 GIC 281 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1910 : rel_sync_cache_publication_cb,
1911 : (Datum) 0);
529 akapila 1912 281 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1913 : rel_sync_cache_publication_cb,
1914 : (Datum) 0);
1915 :
45 tgl 1916 CBC 281 : relation_callbacks_registered = true;
1917 : }
1918 :
1919 : /*
948 akapila 1920 ECB : * We expect relatively small number of streamed transactions.
1921 : */
1922 : static bool
948 akapila 1923 GIC 175901 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
948 akapila 1924 ECB : {
279 alvherre 1925 GNC 175901 : return list_member_xid(entry->streamed_txns, xid);
1926 : }
1927 :
1928 : /*
948 akapila 1929 ECB : * Add the xid in the rel sync entry for which we have already sent the schema
1930 : * of the relation.
1931 : */
1932 : static void
948 akapila 1933 GIC 64 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
948 akapila 1934 ECB : {
1935 : MemoryContext oldctx;
1936 :
948 akapila 1937 GIC 64 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1938 :
279 alvherre 1939 GNC 64 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1940 :
948 akapila 1941 GIC 64 : MemoryContextSwitchTo(oldctx);
948 akapila 1942 CBC 64 : }
1943 :
1944 : /*
2271 peter_e 1945 ECB : * Find or create entry in the relation schema cache.
1946 : *
1125 peter 1947 : * This looks up publications that the given relation is directly or
1948 : * indirectly part of (the latter if it's really the relation's ancestor that
1949 : * is part of a publication) and fills up the found entry with the information
1096 1950 : * about which operations to publish and whether to use an ancestor's schema
1951 : * when publishing.
2271 peter_e 1952 : */
1953 : static RelationSyncEntry *
411 akapila 1954 CBC 183108 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2271 peter_e 1955 ECB : {
2153 bruce 1956 : RelationSyncEntry *entry;
1957 : bool found;
1958 : MemoryContext oldctx;
411 akapila 1959 GIC 183108 : Oid relid = RelationGetRelid(relation);
1960 :
2271 peter_e 1961 183108 : Assert(RelationSyncCache != NULL);
2271 peter_e 1962 ECB :
1963 : /* Find cached relation info, creating if not found */
2271 peter_e 1964 CBC 183108 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1965 : &relid,
1966 : HASH_ENTER, &found);
2271 peter_e 1967 GIC 183108 : Assert(entry != NULL);
1968 :
1969 : /* initialize entry, if it's new */
935 akapila 1970 183108 : if (!found)
1971 : {
429 akapila 1972 CBC 207 : entry->replicate_valid = false;
935 akapila 1973 GIC 207 : entry->schema_sent = false;
935 akapila 1974 CBC 207 : entry->streamed_txns = NIL;
1975 207 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
367 tomas.vondra 1976 207 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
411 akapila 1977 207 : entry->new_slot = NULL;
1978 207 : entry->old_slot = NULL;
411 akapila 1979 GIC 207 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
379 tomas.vondra 1980 207 : entry->entry_cxt = NULL;
935 akapila 1981 CBC 207 : entry->publish_as_relid = InvalidOid;
379 tomas.vondra 1982 GIC 207 : entry->columns = NULL;
411 akapila 1983 CBC 207 : entry->attrmap = NULL;
935 akapila 1984 ECB : }
1985 :
1986 : /* Validate the entry */
935 akapila 1987 CBC 183108 : if (!entry->replicate_valid)
1988 : {
529 1989 260 : Oid schemaId = get_rel_namespace(relid);
2271 peter_e 1990 260 : List *pubids = GetRelationPublications(relid);
367 tomas.vondra 1991 ECB :
1992 : /*
1993 : * We don't acquire a lock on the namespace system table as we build
1994 : * the cache entry using a historic snapshot and all the later changes
1995 : * are absorbed while decoding WAL.
1996 : */
367 tomas.vondra 1997 GIC 260 : List *schemaPubids = GetSchemaPublications(schemaId);
1998 : ListCell *lc;
1096 peter 1999 260 : Oid publish_as_relid = relid;
389 tomas.vondra 2000 CBC 260 : int publish_ancestor_level = 0;
459 michael 2001 260 : bool am_partition = get_rel_relispartition(relid);
367 tomas.vondra 2002 260 : char relkind = get_rel_relkind(relid);
411 akapila 2003 260 : List *rel_publications = NIL;
2271 peter_e 2004 ECB :
2005 : /* Reload publications if needed before use. */
2271 peter_e 2006 CBC 260 : if (!publications_valid)
2271 peter_e 2007 ECB : {
2271 peter_e 2008 CBC 128 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2271 peter_e 2009 GIC 128 : if (data->publications)
2010 : {
2011 19 : list_free_deep(data->publications);
429 akapila 2012 19 : data->publications = NIL;
429 akapila 2013 ECB : }
2271 peter_e 2014 CBC 128 : data->publications = LoadPublications(data->publication_names);
2015 126 : MemoryContextSwitchTo(oldctx);
2016 126 : publications_valid = true;
2017 : }
2271 peter_e 2018 ECB :
429 akapila 2019 : /*
2020 : * Reset schema_sent status as the relation definition may have
2021 : * changed. Also reset pubactions to empty in case rel was dropped
429 akapila 2022 EUB : * from a publication. Also free any objects that depended on the
429 akapila 2023 ECB : * earlier definition.
2024 : */
429 akapila 2025 GIC 258 : entry->schema_sent = false;
2026 258 : list_free(entry->streamed_txns);
2027 258 : entry->streamed_txns = NIL;
379 tomas.vondra 2028 CBC 258 : bms_free(entry->columns);
2029 258 : entry->columns = NULL;
429 akapila 2030 GIC 258 : entry->pubactions.pubinsert = false;
429 akapila 2031 CBC 258 : entry->pubactions.pubupdate = false;
2032 258 : entry->pubactions.pubdelete = false;
2033 258 : entry->pubactions.pubtruncate = false;
2034 :
2035 : /*
2036 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2037 : */
411 akapila 2038 GIC 258 : if (entry->old_slot)
2039 49 : ExecDropSingleTupleTableSlot(entry->old_slot);
2040 258 : if (entry->new_slot)
411 akapila 2041 CBC 49 : ExecDropSingleTupleTableSlot(entry->new_slot);
2042 :
2043 258 : entry->old_slot = NULL;
2044 258 : entry->new_slot = NULL;
2045 :
411 akapila 2046 GIC 258 : if (entry->attrmap)
411 akapila 2047 UIC 0 : free_attrmap(entry->attrmap);
411 akapila 2048 GIC 258 : entry->attrmap = NULL;
2049 :
2050 : /*
411 akapila 2051 ECB : * Row filter cache cleanups.
2052 : */
379 tomas.vondra 2053 GIC 258 : if (entry->entry_cxt)
2054 10 : MemoryContextDelete(entry->entry_cxt);
2055 :
2056 258 : entry->entry_cxt = NULL;
411 akapila 2057 258 : entry->estate = NULL;
411 akapila 2058 CBC 258 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2059 :
2271 peter_e 2060 ECB : /*
2153 bruce 2061 : * Build publication cache. We can't use one provided by relcache as
2062 : * relcache considers all publications that the given relation is in,
379 tomas.vondra 2063 : * but here we only need to consider ones that the subscriber
2064 : * requested.
2271 peter_e 2065 : */
2271 peter_e 2066 CBC 663 : foreach(lc, data->publications)
2067 : {
2271 peter_e 2068 GIC 405 : Publication *pub = lfirst(lc);
1096 peter 2069 405 : bool publish = false;
1096 peter 2070 ECB :
2071 : /*
389 tomas.vondra 2072 : * Under what relid should we publish changes in this publication?
2073 : * We'll use the top-most relid across all publications. Also
2074 : * track the ancestor level for this publication.
2075 : */
332 tgl 2076 GIC 405 : Oid pub_relid = relid;
2077 405 : int ancestor_level = 0;
2078 :
2079 : /*
332 tgl 2080 ECB : * If this is a FOR ALL TABLES publication, pick the partition
2081 : * root and set the ancestor level accordingly.
2082 : */
367 tomas.vondra 2083 GIC 405 : if (pub->alltables)
1096 peter 2084 ECB : {
1096 peter 2085 GIC 62 : publish = true;
1096 peter 2086 CBC 62 : if (pub->pubviaroot && am_partition)
2087 : {
389 tomas.vondra 2088 GIC 10 : List *ancestors = get_partition_ancestors(relid);
2089 :
389 tomas.vondra 2090 CBC 10 : pub_relid = llast_oid(ancestors);
389 tomas.vondra 2091 GIC 10 : ancestor_level = list_length(ancestors);
389 tomas.vondra 2092 ECB : }
1096 peter 2093 : }
2094 :
1096 peter 2095 CBC 405 : if (!publish)
1096 peter 2096 ECB : {
1060 tgl 2097 GIC 343 : bool ancestor_published = false;
2098 :
2099 : /*
2100 : * For a partition, check if any of the ancestors are
1096 peter 2101 ECB : * published. If so, note down the topmost ancestor that is
2102 : * published via this publication, which will be used as the
2103 : * relation via which to publish the partition's changes.
2104 : */
1096 peter 2105 GIC 343 : if (am_partition)
2106 : {
2107 : Oid ancestor;
2108 : int level;
1060 tgl 2109 99 : List *ancestors = get_partition_ancestors(relid);
2110 :
411 akapila 2111 99 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2112 : ancestors,
2113 : &level);
2114 :
411 akapila 2115 CBC 99 : if (ancestor != InvalidOid)
1096 peter 2116 ECB : {
411 akapila 2117 GIC 40 : ancestor_published = true;
411 akapila 2118 CBC 40 : if (pub->pubviaroot)
389 tomas.vondra 2119 ECB : {
389 tomas.vondra 2120 CBC 17 : pub_relid = ancestor;
2121 17 : ancestor_level = level;
2122 : }
2123 : }
2124 : }
2125 :
529 akapila 2126 GIC 540 : if (list_member_oid(pubids, pub->oid) ||
2127 388 : list_member_oid(schemaPubids, pub->oid) ||
2128 : ancestor_published)
1096 peter 2129 177 : publish = true;
1096 peter 2130 ECB : }
2271 peter_e 2131 :
2132 : /*
2133 : * If the relation is to be published, determine actions to
2134 : * publish, and list of columns, if appropriate.
2135 : *
2136 : * Don't publish changes for partitioned tables, because
2137 : * publishing those of its partitions suffices, unless partition
1096 peter 2138 : * changes won't be published due to pubviaroot being set.
2139 : */
1096 peter 2140 CBC 405 : if (publish &&
2141 3 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2142 : {
2271 peter_e 2143 GIC 236 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2271 peter_e 2144 CBC 236 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2271 peter_e 2145 GIC 236 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1828 2146 236 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2147 :
2148 : /*
389 tomas.vondra 2149 ECB : * We want to publish the changes as the top-most ancestor
2150 : * across all publications. So we need to check if the already
2151 : * calculated level is higher than the new one. If yes, we can
2152 : * ignore the new value (as it's a child). Otherwise the new
332 tgl 2153 : * value is an ancestor, so we keep it.
2154 : */
389 tomas.vondra 2155 GIC 236 : if (publish_ancestor_level > ancestor_level)
2156 1 : continue;
389 tomas.vondra 2157 ECB :
2158 : /*
2159 : * If we found an ancestor higher up in the tree, discard the
2160 : * list of publications through which we replicate it, and use
2161 : * the new ancestor.
2162 : */
388 tomas.vondra 2163 CBC 235 : if (publish_ancestor_level < ancestor_level)
388 tomas.vondra 2164 ECB : {
388 tomas.vondra 2165 GIC 27 : publish_as_relid = pub_relid;
2166 27 : publish_ancestor_level = ancestor_level;
388 tomas.vondra 2167 ECB :
2168 : /* reset the publication list for this relation */
388 tomas.vondra 2169 GIC 27 : rel_publications = NIL;
388 tomas.vondra 2170 ECB : }
2171 : else
2172 : {
2173 : /* Same ancestor level, has to be the same OID. */
388 tomas.vondra 2174 GIC 208 : Assert(publish_as_relid == pub_relid);
2175 : }
388 tomas.vondra 2176 ECB :
2177 : /* Track publications for this ancestor. */
388 tomas.vondra 2178 CBC 235 : rel_publications = lappend(rel_publications, pub);
2179 : }
411 akapila 2180 ECB : }
2181 :
411 akapila 2182 GIC 258 : entry->publish_as_relid = publish_as_relid;
411 akapila 2183 ECB :
2184 : /*
2185 : * Initialize the tuple slot, map, and row filter. These are only used
2186 : * when publishing inserts, updates, or deletes.
2187 : */
411 akapila 2188 GIC 258 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2189 36 : entry->pubactions.pubdelete)
2190 : {
2191 : /* Initialize the tuple slot and map */
2192 222 : init_tuple_slot(data, relation, entry);
2193 :
2194 : /* Initialize the row filter */
2195 222 : pgoutput_row_filter_init(data, rel_publications, entry);
2196 :
2197 : /* Initialize the column list */
379 tomas.vondra 2198 CBC 222 : pgoutput_column_list_init(data, rel_publications, entry);
2199 : }
2200 :
2271 peter_e 2201 GIC 257 : list_free(pubids);
429 akapila 2202 257 : list_free(schemaPubids);
411 2203 257 : list_free(rel_publications);
2271 peter_e 2204 ECB :
2271 peter_e 2205 GIC 257 : entry->replicate_valid = true;
2271 peter_e 2206 ECB : }
2207 :
2271 peter_e 2208 GIC 183105 : return entry;
2209 : }
2210 :
2211 : /*
2212 : * Cleanup list of streamed transactions and update the schema_sent flag.
2213 : *
2214 : * When a streamed transaction commits or aborts, we need to remove the
948 akapila 2215 ECB : * toplevel XID from the schema cache. If the transaction aborted, the
2216 : * subscriber will simply throw away the schema records we streamed, so
2217 : * we don't need to do anything else.
2218 : *
2219 : * If the transaction is committed, the subscriber will update the relation
2220 : * cache - so tweak the schema_sent flag accordingly.
2221 : */
2222 : static void
948 akapila 2223 CBC 70 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
948 akapila 2224 ECB : {
2225 : HASH_SEQ_STATUS hash_seq;
2226 : RelationSyncEntry *entry;
2227 : ListCell *lc;
2228 :
948 akapila 2229 GIC 70 : Assert(RelationSyncCache != NULL);
2230 :
2231 70 : hash_seq_init(&hash_seq, RelationSyncCache);
2232 143 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2233 : {
948 akapila 2234 ECB : /*
2235 : * We can set the schema_sent flag for an entry that has committed xid
2236 : * in the list as that ensures that the subscriber would have the
2237 : * corresponding schema and we don't need to send it unless there is
2238 : * any invalidation for that relation.
2239 : */
948 akapila 2240 GIC 91 : foreach(lc, entry->streamed_txns)
2241 : {
279 alvherre 2242 GNC 70 : if (xid == lfirst_xid(lc))
948 akapila 2243 ECB : {
948 akapila 2244 CBC 52 : if (is_commit)
948 akapila 2245 GIC 41 : entry->schema_sent = true;
2246 :
2247 52 : entry->streamed_txns =
2248 52 : foreach_delete_current(entry->streamed_txns, lc);
2249 52 : break;
2250 : }
2251 : }
2252 : }
2253 70 : }
948 akapila 2254 ECB :
2255 : /*
2256 : * Relcache invalidation callback
2257 : */
2258 : static void
2271 peter_e 2259 GIC 3135 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2271 peter_e 2260 ECB : {
2261 : RelationSyncEntry *entry;
2262 :
2263 : /*
2264 : * We can get here if the plugin was used in SQL interface as the
2265 : * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2266 : * is no way to unregister the relcache invalidation callback.
2267 : */
2271 peter_e 2268 GIC 3135 : if (RelationSyncCache == NULL)
2269 6 : return;
2271 peter_e 2270 ECB :
2271 : /*
2272 : * Nobody keeps pointers to entries in this hash table around outside
2273 : * logical decoding callback calls - but invalidation events can come in
2274 : * *during* a callback if we do any syscache access in the callback.
2275 : * Because of that we must mark the cache entry as invalid but not damage
2276 : * any of its substructure here. The next get_rel_sync_entry() call will
2277 : * rebuild it all.
2278 : */
429 akapila 2279 GIC 3129 : if (OidIsValid(relid))
2280 : {
2281 : /*
2282 : * Getting invalidations for relations that aren't in the table is
2283 : * entirely normal. So we don't care if it's found or not.
2284 : */
429 akapila 2285 CBC 3106 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2286 : HASH_FIND, NULL);
429 akapila 2287 GIC 3106 : if (entry != NULL)
2288 445 : entry->replicate_valid = false;
2289 : }
2290 : else
2291 : {
2292 : /* Whole cache must be flushed. */
2293 : HASH_SEQ_STATUS status;
2294 :
429 akapila 2295 CBC 23 : hash_seq_init(&status, RelationSyncCache);
2296 53 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2297 : {
429 akapila 2298 GIC 30 : entry->replicate_valid = false;
2299 : }
2300 : }
2301 : }
2271 peter_e 2302 ECB :
2303 : /*
2304 : * Publication relation/schema map syscache invalidation callback
429 akapila 2305 : *
2306 : * Called for invalidations on pg_publication, pg_publication_rel,
2307 : * pg_publication_namespace, and pg_namespace.
2308 : */
2309 : static void
2271 peter_e 2310 GIC 638 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2271 peter_e 2311 ECB : {
2312 : HASH_SEQ_STATUS status;
2313 : RelationSyncEntry *entry;
2314 :
2315 : /*
2316 : * We can get here if the plugin was used in SQL interface as the
2317 : * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2318 : * is no way to unregister the invalidation callbacks.
2319 : */
2271 peter_e 2320 GIC 638 : if (RelationSyncCache == NULL)
2321 24 : return;
2322 :
2323 : /*
2324 : * We have no easy way to identify which cache entries this invalidation
2325 : * event might have affected, so just mark them all invalid.
2326 : */
2327 614 : hash_seq_init(&status, RelationSyncCache);
2271 peter_e 2328 CBC 1819 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2329 : {
2271 peter_e 2330 GIC 1205 : entry->replicate_valid = false;
804 akapila 2331 ECB : }
2271 peter_e 2332 : }
2333 :
634 akapila 2334 : /* Send Replication origin */
2335 : static void
634 akapila 2336 GIC 948 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
634 akapila 2337 ECB : XLogRecPtr origin_lsn, bool send_origin)
2338 : {
634 akapila 2339 GIC 948 : if (send_origin)
2340 : {
2341 : char *origin;
2342 :
2343 : /*----------
2344 : * XXX: which behaviour do we want here?
2345 : *
2346 : * Alternatives:
2347 : * - don't send origin message if origin name not found
2348 : * (that's what we do now)
2349 : * - throw error - that will break replication, not good
2350 : * - send some special "unknown" origin
2351 : *----------
2352 : */
2353 7 : if (replorigin_by_oid(origin_id, true, &origin))
2354 : {
2355 : /* Message boundary */
2356 7 : OutputPluginWrite(ctx, false);
2357 7 : OutputPluginPrepareWrite(ctx, true);
2358 :
2359 7 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2360 : }
2361 : }
2362 948 : }
|