Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pgoutput.c
4 : : * Logical Replication output plugin
5 : : *
6 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/pgoutput/pgoutput.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/tupconvert.h"
16 : : #include "catalog/partition.h"
17 : : #include "catalog/pg_publication.h"
18 : : #include "catalog/pg_publication_rel.h"
19 : : #include "catalog/pg_subscription.h"
20 : : #include "commands/defrem.h"
21 : : #include "commands/subscriptioncmds.h"
22 : : #include "executor/executor.h"
23 : : #include "fmgr.h"
24 : : #include "nodes/makefuncs.h"
25 : : #include "parser/parse_relation.h"
26 : : #include "replication/logical.h"
27 : : #include "replication/logicalproto.h"
28 : : #include "replication/origin.h"
29 : : #include "replication/pgoutput.h"
30 : : #include "utils/builtins.h"
31 : : #include "utils/inval.h"
32 : : #include "utils/lsyscache.h"
33 : : #include "utils/memutils.h"
34 : : #include "utils/rel.h"
35 : : #include "utils/syscache.h"
36 : : #include "utils/varlena.h"
37 : :
2642 peter_e@gmx.net 38 :CBC 430 : PG_MODULE_MAGIC;
39 : :
40 : : static void pgoutput_startup(LogicalDecodingContext *ctx,
41 : : OutputPluginOptions *opt, bool is_init);
42 : : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
43 : : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
44 : : ReorderBufferTXN *txn);
45 : : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
46 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47 : : static void pgoutput_change(LogicalDecodingContext *ctx,
48 : : ReorderBufferTXN *txn, Relation relation,
49 : : ReorderBufferChange *change);
50 : : static void pgoutput_truncate(LogicalDecodingContext *ctx,
51 : : ReorderBufferTXN *txn, int nrelations, Relation relations[],
52 : : ReorderBufferChange *change);
53 : : static void pgoutput_message(LogicalDecodingContext *ctx,
54 : : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
55 : : bool transactional, const char *prefix,
56 : : Size sz, const char *message);
57 : : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
58 : : RepOriginId origin_id);
59 : : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
60 : : ReorderBufferTXN *txn);
61 : : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
62 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
63 : : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
64 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
66 : : ReorderBufferTXN *txn,
67 : : XLogRecPtr prepare_end_lsn,
68 : : TimestampTz prepare_time);
69 : : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
70 : : ReorderBufferTXN *txn);
71 : : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
72 : : ReorderBufferTXN *txn);
73 : : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
74 : : ReorderBufferTXN *txn,
75 : : XLogRecPtr abort_lsn);
76 : : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
77 : : ReorderBufferTXN *txn,
78 : : XLogRecPtr commit_lsn);
79 : : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
80 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
81 : :
82 : : static bool publications_valid;
83 : :
84 : : static List *LoadPublications(List *pubnames);
85 : : static void publication_invalidation_cb(Datum arg, int cacheid,
86 : : uint32 hashvalue);
87 : : static void send_relation_and_attrs(Relation relation, TransactionId xid,
88 : : LogicalDecodingContext *ctx,
89 : : Bitmapset *columns);
90 : : static void send_repl_origin(LogicalDecodingContext *ctx,
91 : : RepOriginId origin_id, XLogRecPtr origin_lsn,
92 : : bool send_origin);
93 : :
94 : : /*
95 : : * Only 3 publication actions are used for row filtering ("insert", "update",
96 : : * "delete"). See RelationSyncEntry.exprstate[].
97 : : */
98 : : enum RowFilterPubAction
99 : : {
100 : : PUBACTION_INSERT,
101 : : PUBACTION_UPDATE,
102 : : PUBACTION_DELETE,
103 : : };
104 : :
105 : : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
106 : :
107 : : /*
108 : : * Entry in the map used to remember which relation schemas we sent.
109 : : *
110 : : * The schema_sent flag determines if the current schema record for the
111 : : * relation (and for its ancestor if publish_as_relid is set) was already
112 : : * sent to the subscriber (in which case we don't need to send it again).
113 : : *
114 : : * The schema cache on downstream is however updated only at commit time,
115 : : * and with streamed transactions the commit order may be different from
116 : : * the order the transactions are sent in. Also, the (sub) transactions
117 : : * might get aborted so we need to send the schema for each (sub) transaction
118 : : * so that we don't lose the schema information on abort. For handling this,
119 : : * we maintain the list of xids (streamed_txns) for those we have already sent
120 : : * the schema.
121 : : *
122 : : * For partitions, 'pubactions' considers not only the table's own
123 : : * publications, but also those of all of its ancestors.
124 : : */
125 : : typedef struct RelationSyncEntry
126 : : {
127 : : Oid relid; /* relation oid */
128 : :
129 : : bool replicate_valid; /* overall validity flag for entry */
130 : :
131 : : bool schema_sent;
132 : : List *streamed_txns; /* streamed toplevel transactions with this
133 : : * schema */
134 : :
135 : : /* are we publishing this rel? */
136 : : PublicationActions pubactions;
137 : :
138 : : /*
139 : : * ExprState array for row filter. Different publication actions don't
140 : : * allow multiple expressions to always be combined into one, because
141 : : * updates or deletes restrict the column in expression to be part of the
142 : : * replica identity index whereas inserts do not have this restriction, so
143 : : * there is one ExprState per publication action.
144 : : */
145 : : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
146 : : EState *estate; /* executor state used for row filter */
147 : : TupleTableSlot *new_slot; /* slot for storing new tuple */
148 : : TupleTableSlot *old_slot; /* slot for storing old tuple */
149 : :
150 : : /*
151 : : * OID of the relation to publish changes as. For a partition, this may
152 : : * be set to one of its ancestors whose schema will be used when
153 : : * replicating changes, if publish_via_partition_root is set for the
154 : : * publication.
155 : : */
156 : : Oid publish_as_relid;
157 : :
158 : : /*
159 : : * Map used when replicating using an ancestor's schema to convert tuples
160 : : * from partition's type to the ancestor's; NULL if publish_as_relid is
161 : : * same as 'relid' or if unnecessary due to partition and the ancestor
162 : : * having identical TupleDesc.
163 : : */
164 : : AttrMap *attrmap;
165 : :
166 : : /*
167 : : * Columns included in the publication, or NULL if all columns are
168 : : * included implicitly. Note that the attnums in this bitmap are not
169 : : * shifted by FirstLowInvalidHeapAttributeNumber.
170 : : */
171 : : Bitmapset *columns;
172 : :
173 : : /*
174 : : * Private context to store additional data for this entry - state for the
175 : : * row filter expressions, column list, etc.
176 : : */
177 : : MemoryContext entry_cxt;
178 : : } RelationSyncEntry;
179 : :
180 : : /*
181 : : * Maintain a per-transaction level variable to track whether the transaction
182 : : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
183 : : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
184 : : * messages for empty transactions which saves network bandwidth.
185 : : *
186 : : * This optimization is not used for prepared transactions because if the
187 : : * WALSender restarts after prepare of a transaction and before commit prepared
188 : : * of the same transaction then we won't be able to figure out if we have
189 : : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
190 : : * because we would have lost the in-memory txndata information that was
191 : : * present prior to the restart. This will result in sending a spurious
192 : : * COMMIT PREPARED without a corresponding prepared transaction at the
193 : : * downstream which would lead to an error when it tries to process it.
194 : : *
195 : : * XXX We could achieve this optimization by changing protocol to send
196 : : * additional information so that downstream can detect that the corresponding
197 : : * prepare has not been sent. However, adding such a check for every
198 : : * transaction in the downstream could be costly so we might want to do it
199 : : * optionally.
200 : : *
201 : : * We also don't have this optimization for streamed transactions because
202 : : * they can contain prepared transactions.
203 : : */
204 : : typedef struct PGOutputTxnData
205 : : {
206 : : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
207 : : } PGOutputTxnData;
208 : :
209 : : /* Map used to remember which relation schemas we sent. */
210 : : static HTAB *RelationSyncCache = NULL;
211 : :
212 : : static void init_rel_sync_cache(MemoryContext cachectx);
213 : : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
214 : : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
215 : : Relation relation);
216 : : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
217 : : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
218 : : uint32 hashvalue);
219 : : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
220 : : TransactionId xid);
221 : : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
222 : : TransactionId xid);
223 : : static void init_tuple_slot(PGOutputData *data, Relation relation,
224 : : RelationSyncEntry *entry);
225 : :
226 : : /* row filter routines */
227 : : static EState *create_estate_for_relation(Relation rel);
228 : : static void pgoutput_row_filter_init(PGOutputData *data,
229 : : List *publications,
230 : : RelationSyncEntry *entry);
231 : : static bool pgoutput_row_filter_exec_expr(ExprState *state,
232 : : ExprContext *econtext);
233 : : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
234 : : TupleTableSlot **new_slot_ptr,
235 : : RelationSyncEntry *entry,
236 : : ReorderBufferChangeType *action);
237 : :
238 : : /* column list routines */
239 : : static void pgoutput_column_list_init(PGOutputData *data,
240 : : List *publications,
241 : : RelationSyncEntry *entry);
242 : :
243 : : /*
244 : : * Specify output plugin callbacks
245 : : */
246 : : void
247 : 594 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
248 : : {
249 : 594 : cb->startup_cb = pgoutput_startup;
250 : 594 : cb->begin_cb = pgoutput_begin_txn;
251 : 594 : cb->change_cb = pgoutput_change;
2199 252 : 594 : cb->truncate_cb = pgoutput_truncate;
1104 akapila@postgresql.o 253 : 594 : cb->message_cb = pgoutput_message;
2642 peter_e@gmx.net 254 : 594 : cb->commit_cb = pgoutput_commit_txn;
255 : :
1005 akapila@postgresql.o 256 : 594 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
257 : 594 : cb->prepare_cb = pgoutput_prepare_txn;
258 : 594 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
259 : 594 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
2642 peter_e@gmx.net 260 : 594 : cb->filter_by_origin_cb = pgoutput_origin_filter;
261 : 594 : cb->shutdown_cb = pgoutput_shutdown;
262 : :
263 : : /* transaction streaming */
1319 akapila@postgresql.o 264 : 594 : cb->stream_start_cb = pgoutput_stream_start;
265 : 594 : cb->stream_stop_cb = pgoutput_stream_stop;
266 : 594 : cb->stream_abort_cb = pgoutput_stream_abort;
267 : 594 : cb->stream_commit_cb = pgoutput_stream_commit;
268 : 594 : cb->stream_change_cb = pgoutput_change;
1104 269 : 594 : cb->stream_message_cb = pgoutput_message;
1319 270 : 594 : cb->stream_truncate_cb = pgoutput_truncate;
271 : : /* transaction streaming - two-phase commit */
984 272 : 594 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
2642 peter_e@gmx.net 273 : 594 : }
274 : :
275 : : static void
1104 akapila@postgresql.o 276 : 318 : parse_output_parameters(List *options, PGOutputData *data)
277 : : {
278 : : ListCell *lc;
2642 peter_e@gmx.net 279 : 318 : bool protocol_version_given = false;
280 : 318 : bool publication_names_given = false;
1366 tgl@sss.pgh.pa.us 281 : 318 : bool binary_option_given = false;
1104 akapila@postgresql.o 282 : 318 : bool messages_option_given = false;
1319 283 : 318 : bool streaming_given = false;
1005 284 : 318 : bool two_phase_option_given = false;
633 285 : 318 : bool origin_option_given = false;
286 : :
1104 287 : 318 : data->binary = false;
461 288 : 318 : data->streaming = LOGICALREP_STREAM_OFF;
1104 289 : 318 : data->messages = false;
1005 290 : 318 : data->two_phase = false;
291 : :
2642 peter_e@gmx.net 292 [ + - + + : 1317 : foreach(lc, options)
+ + ]
293 : : {
294 : 999 : DefElem *defel = (DefElem *) lfirst(lc);
295 : :
296 [ + - - + ]: 999 : Assert(defel->arg == NULL || IsA(defel->arg, String));
297 : :
298 : : /* Check each param, whether or not we recognize it */
299 [ + + ]: 999 : if (strcmp(defel->defname, "proto_version") == 0)
300 : : {
301 : : unsigned long parsed;
302 : : char *endptr;
303 : :
304 [ - + ]: 318 : if (protocol_version_given)
2642 peter_e@gmx.net 305 [ # # ]:UBC 0 : ereport(ERROR,
306 : : (errcode(ERRCODE_SYNTAX_ERROR),
307 : : errmsg("conflicting or redundant options")));
2642 peter_e@gmx.net 308 :CBC 318 : protocol_version_given = true;
309 : :
790 peter@eisentraut.org 310 : 318 : errno = 0;
311 : 318 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
312 [ + - - + ]: 318 : if (errno != 0 || *endptr != '\0')
2642 peter_e@gmx.net 313 [ # # ]:UBC 0 : ereport(ERROR,
314 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
315 : : errmsg("invalid proto_version")));
316 : :
790 peter@eisentraut.org 317 [ - + ]:CBC 318 : if (parsed > PG_UINT32_MAX)
2642 peter_e@gmx.net 318 [ # # ]:UBC 0 : ereport(ERROR,
319 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
320 : : errmsg("proto_version \"%s\" out of range",
321 : : strVal(defel->arg))));
322 : :
1104 akapila@postgresql.o 323 :CBC 318 : data->protocol_version = (uint32) parsed;
324 : : }
2642 peter_e@gmx.net 325 [ + + ]: 681 : else if (strcmp(defel->defname, "publication_names") == 0)
326 : : {
327 [ - + ]: 318 : if (publication_names_given)
2642 peter_e@gmx.net 328 [ # # ]:UBC 0 : ereport(ERROR,
329 : : (errcode(ERRCODE_SYNTAX_ERROR),
330 : : errmsg("conflicting or redundant options")));
2642 peter_e@gmx.net 331 :CBC 318 : publication_names_given = true;
332 : :
333 [ - + ]: 318 : if (!SplitIdentifierString(strVal(defel->arg), ',',
334 : : &data->publication_names))
2524 bruce@momjian.us 335 [ # # ]:UBC 0 : ereport(ERROR,
336 : : (errcode(ERRCODE_INVALID_NAME),
337 : : errmsg("invalid publication_names syntax")));
338 : : }
1366 tgl@sss.pgh.pa.us 339 [ + + ]:CBC 363 : else if (strcmp(defel->defname, "binary") == 0)
340 : : {
341 [ - + ]: 11 : if (binary_option_given)
1366 tgl@sss.pgh.pa.us 342 [ # # ]:UBC 0 : ereport(ERROR,
343 : : (errcode(ERRCODE_SYNTAX_ERROR),
344 : : errmsg("conflicting or redundant options")));
1366 tgl@sss.pgh.pa.us 345 :CBC 11 : binary_option_given = true;
346 : :
1104 akapila@postgresql.o 347 : 11 : data->binary = defGetBoolean(defel);
348 : : }
349 [ + + ]: 352 : else if (strcmp(defel->defname, "messages") == 0)
350 : : {
351 [ - + ]: 4 : if (messages_option_given)
1104 akapila@postgresql.o 352 [ # # ]:UBC 0 : ereport(ERROR,
353 : : (errcode(ERRCODE_SYNTAX_ERROR),
354 : : errmsg("conflicting or redundant options")));
1104 akapila@postgresql.o 355 :CBC 4 : messages_option_given = true;
356 : :
357 : 4 : data->messages = defGetBoolean(defel);
358 : : }
1319 359 [ + + ]: 348 : else if (strcmp(defel->defname, "streaming") == 0)
360 : : {
361 [ - + ]: 33 : if (streaming_given)
1319 akapila@postgresql.o 362 [ # # ]:UBC 0 : ereport(ERROR,
363 : : (errcode(ERRCODE_SYNTAX_ERROR),
364 : : errmsg("conflicting or redundant options")));
1319 akapila@postgresql.o 365 :CBC 33 : streaming_given = true;
366 : :
461 367 : 33 : data->streaming = defGetStreamingMode(defel);
368 : : }
1005 369 [ + + ]: 315 : else if (strcmp(defel->defname, "two_phase") == 0)
370 : : {
371 [ - + ]: 4 : if (two_phase_option_given)
1005 akapila@postgresql.o 372 [ # # ]:UBC 0 : ereport(ERROR,
373 : : (errcode(ERRCODE_SYNTAX_ERROR),
374 : : errmsg("conflicting or redundant options")));
1005 akapila@postgresql.o 375 :CBC 4 : two_phase_option_given = true;
376 : :
377 : 4 : data->two_phase = defGetBoolean(defel);
378 : : }
633 379 [ + - ]: 311 : else if (strcmp(defel->defname, "origin") == 0)
380 : : {
381 : : char *origin;
382 : :
383 [ - + ]: 311 : if (origin_option_given)
633 akapila@postgresql.o 384 [ # # ]:UBC 0 : ereport(ERROR,
385 : : errcode(ERRCODE_SYNTAX_ERROR),
386 : : errmsg("conflicting or redundant options"));
633 akapila@postgresql.o 387 :CBC 311 : origin_option_given = true;
388 : :
200 akapila@postgresql.o 389 :GNC 311 : origin = defGetString(defel);
390 [ + + ]: 311 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
391 : 10 : data->publish_no_origin = true;
392 [ + - ]: 301 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
393 : 301 : data->publish_no_origin = false;
394 : : else
633 akapila@postgresql.o 395 [ # # ]:UBC 0 : ereport(ERROR,
396 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
397 : : errmsg("unrecognized origin value: \"%s\"", origin));
398 : : }
399 : : else
2642 peter_e@gmx.net 400 [ # # ]: 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
401 : : }
402 : :
403 : : /* Check required options */
117 akapila@postgresql.o 404 [ - + ]:GNC 318 : if (!protocol_version_given)
117 akapila@postgresql.o 405 [ # # ]:UNC 0 : ereport(ERROR,
406 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407 : : errmsg("proto_version option missing"));
117 akapila@postgresql.o 408 [ - + ]:GNC 318 : if (!publication_names_given)
117 akapila@postgresql.o 409 [ # # ]:UNC 0 : ereport(ERROR,
410 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
411 : : errmsg("publication_names option missing"));
2642 peter_e@gmx.net 412 :CBC 318 : }
413 : :
414 : : /*
415 : : * Initialize this plugin
416 : : */
417 : : static void
2524 bruce@momjian.us 418 : 594 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
419 : : bool is_init)
420 : : {
421 : 594 : PGOutputData *data = palloc0(sizeof(PGOutputData));
422 : : static bool publication_callback_registered = false;
423 : :
424 : : /* Create our memory context for private allocations. */
2642 peter_e@gmx.net 425 : 594 : data->context = AllocSetContextCreate(ctx->context,
426 : : "logical replication output context",
427 : : ALLOCSET_DEFAULT_SIZES);
428 : :
782 akapila@postgresql.o 429 : 594 : data->cachectx = AllocSetContextCreate(ctx->context,
430 : : "logical replication cache context",
431 : : ALLOCSET_DEFAULT_SIZES);
432 : :
2642 peter_e@gmx.net 433 : 594 : ctx->output_plugin_private = data;
434 : :
435 : : /* This plugin uses binary protocol. */
436 : 594 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
437 : :
438 : : /*
439 : : * This is replication start and not slot initialization.
440 : : *
441 : : * Parse and validate options passed by the client.
442 : : */
443 [ + + ]: 594 : if (!is_init)
444 : : {
445 : : /* Parse the params and ERROR if we see any we don't recognize */
1104 akapila@postgresql.o 446 : 318 : parse_output_parameters(ctx->output_plugin_options, data);
447 : :
448 : : /* Check if we support requested protocol */
1296 449 [ - + ]: 318 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
2642 peter_e@gmx.net 450 [ # # ]:UBC 0 : ereport(ERROR,
451 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
452 : : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
453 : : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
454 : :
2642 peter_e@gmx.net 455 [ - + ]:CBC 318 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
2642 peter_e@gmx.net 456 [ # # ]:UBC 0 : ereport(ERROR,
457 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458 : : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
459 : : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
460 : :
461 : : /*
462 : : * Decide whether to enable streaming. It is disabled by default, in
463 : : * which case we just update the flag in decoding context. Otherwise
464 : : * we only allow it with sufficient version of the protocol, and when
465 : : * the output plugin supports it.
466 : : */
461 akapila@postgresql.o 467 [ + + ]:CBC 318 : if (data->streaming == LOGICALREP_STREAM_OFF)
1319 468 : 285 : ctx->streaming = false;
461 469 [ + + ]: 33 : else if (data->streaming == LOGICALREP_STREAM_ON &&
470 [ - + ]: 26 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
1319 akapila@postgresql.o 471 [ # # ]:UBC 0 : ereport(ERROR,
472 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
473 : : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
474 : : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
461 akapila@postgresql.o 475 [ + + ]:CBC 33 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
476 [ - + ]: 7 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
461 akapila@postgresql.o 477 [ # # ]:UBC 0 : ereport(ERROR,
478 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
479 : : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
480 : : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
1319 akapila@postgresql.o 481 [ - + ]:CBC 33 : else if (!ctx->streaming)
1319 akapila@postgresql.o 482 [ # # ]:UBC 0 : ereport(ERROR,
483 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
484 : : errmsg("streaming requested, but not supported by output plugin")));
485 : :
486 : : /*
487 : : * Here, we just check whether the two-phase option is passed by
488 : : * plugin and decide whether to enable it at later point of time. It
489 : : * remains enabled if the previous start-up has done so. But we only
490 : : * allow the option to be passed in with sufficient version of the
491 : : * protocol, and when the output plugin supports it.
492 : : */
1005 akapila@postgresql.o 493 [ + + ]:CBC 318 : if (!data->two_phase)
494 : 314 : ctx->twophase_opt_given = false;
495 [ - + ]: 4 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
1005 akapila@postgresql.o 496 [ # # ]:UBC 0 : ereport(ERROR,
497 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 : : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
499 : : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
1005 akapila@postgresql.o 500 [ - + ]:CBC 4 : else if (!ctx->twophase)
1005 akapila@postgresql.o 501 [ # # ]:UBC 0 : ereport(ERROR,
502 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 : : errmsg("two-phase commit requested, but not supported by output plugin")));
504 : : else
1005 akapila@postgresql.o 505 :CBC 4 : ctx->twophase_opt_given = true;
506 : :
507 : : /* Init publication state. */
2642 peter_e@gmx.net 508 : 318 : data->publications = NIL;
509 : 318 : publications_valid = false;
510 : :
511 : : /*
512 : : * Register callback for pg_publication if we didn't already do that
513 : : * during some previous call in this process.
514 : : */
416 tgl@sss.pgh.pa.us 515 [ + + ]: 318 : if (!publication_callback_registered)
516 : : {
517 : 317 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
518 : : publication_invalidation_cb,
519 : : (Datum) 0);
520 : 317 : publication_callback_registered = true;
521 : : }
522 : :
523 : : /* Initialize relation schema cache. */
2642 peter_e@gmx.net 524 : 318 : init_rel_sync_cache(CacheMemoryContext);
525 : : }
526 : : else
527 : : {
528 : : /*
529 : : * Disable the streaming and prepared transactions during the slot
530 : : * initialization mode.
531 : : */
1319 akapila@postgresql.o 532 : 276 : ctx->streaming = false;
1005 533 : 276 : ctx->twophase = false;
534 : : }
2642 peter_e@gmx.net 535 : 594 : }
536 : :
537 : : /*
538 : : * BEGIN callback.
539 : : *
540 : : * Don't send the BEGIN message here instead postpone it until the first
541 : : * change. In logical replication, a common scenario is to replicate a set of
542 : : * tables (instead of all tables) and transactions whose changes were on
543 : : * the table(s) that are not published will produce empty transactions. These
544 : : * empty transactions will send BEGIN and COMMIT messages to subscribers,
545 : : * using bandwidth on something with little/no use for logical replication.
546 : : */
547 : : static void
703 tgl@sss.pgh.pa.us 548 : 638 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
549 : : {
550 : 638 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
551 : : sizeof(PGOutputTxnData));
552 : :
746 akapila@postgresql.o 553 : 638 : txn->output_plugin_private = txndata;
554 : 638 : }
555 : :
556 : : /*
557 : : * Send BEGIN.
558 : : *
559 : : * This is called while processing the first change of the transaction.
560 : : */
561 : : static void
562 : 360 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
563 : : {
2524 bruce@momjian.us 564 : 360 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
746 akapila@postgresql.o 565 : 360 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
566 : :
567 [ - + ]: 360 : Assert(txndata);
568 [ - + ]: 360 : Assert(!txndata->sent_begin_txn);
569 : :
2642 peter_e@gmx.net 570 : 360 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
571 : 360 : logicalrep_write_begin(ctx->out, txn);
746 akapila@postgresql.o 572 : 360 : txndata->sent_begin_txn = true;
573 : :
1005 574 : 360 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
575 : : send_replication_origin);
576 : :
2642 peter_e@gmx.net 577 : 360 : OutputPluginWrite(ctx, true);
578 : 360 : }
579 : :
580 : : /*
581 : : * COMMIT callback
582 : : */
583 : : static void
584 : 635 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
585 : : XLogRecPtr commit_lsn)
586 : : {
746 akapila@postgresql.o 587 : 635 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
588 : : bool sent_begin_txn;
589 : :
590 [ - + ]: 635 : Assert(txndata);
591 : :
592 : : /*
593 : : * We don't need to send the commit message unless some relevant change
594 : : * from this transaction has been sent to the downstream.
595 : : */
596 : 635 : sent_begin_txn = txndata->sent_begin_txn;
431 597 : 635 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
746 598 : 635 : pfree(txndata);
599 : 635 : txn->output_plugin_private = NULL;
600 : :
601 [ + + ]: 635 : if (!sent_begin_txn)
602 : : {
603 [ + + ]: 275 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
604 : 275 : return;
605 : : }
606 : :
2642 peter_e@gmx.net 607 : 360 : OutputPluginPrepareWrite(ctx, true);
608 : 360 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
609 : 360 : OutputPluginWrite(ctx, true);
610 : : }
611 : :
612 : : /*
613 : : * BEGIN PREPARE callback
614 : : */
615 : : static void
1005 akapila@postgresql.o 616 : 14 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
617 : : {
618 : 14 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
619 : :
620 : 14 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
621 : 14 : logicalrep_write_begin_prepare(ctx->out, txn);
622 : :
623 : 14 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
624 : : send_replication_origin);
625 : :
626 : 14 : OutputPluginWrite(ctx, true);
627 : 14 : }
628 : :
629 : : /*
630 : : * PREPARE callback
631 : : */
632 : : static void
633 : 14 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
634 : : XLogRecPtr prepare_lsn)
635 : : {
431 636 : 14 : OutputPluginUpdateProgress(ctx, false);
637 : :
1005 638 : 14 : OutputPluginPrepareWrite(ctx, true);
639 : 14 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
640 : 14 : OutputPluginWrite(ctx, true);
641 : 14 : }
642 : :
643 : : /*
644 : : * COMMIT PREPARED callback
645 : : */
646 : : static void
647 : 20 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
648 : : XLogRecPtr commit_lsn)
649 : : {
431 650 : 20 : OutputPluginUpdateProgress(ctx, false);
651 : :
1005 652 : 20 : OutputPluginPrepareWrite(ctx, true);
653 : 20 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
654 : 20 : OutputPluginWrite(ctx, true);
655 : 20 : }
656 : :
657 : : /*
658 : : * ROLLBACK PREPARED callback
659 : : */
660 : : static void
661 : 5 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
662 : : ReorderBufferTXN *txn,
663 : : XLogRecPtr prepare_end_lsn,
664 : : TimestampTz prepare_time)
665 : : {
431 666 : 5 : OutputPluginUpdateProgress(ctx, false);
667 : :
1005 668 : 5 : OutputPluginPrepareWrite(ctx, true);
669 : 5 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
670 : : prepare_time);
671 : 5 : OutputPluginWrite(ctx, true);
672 : 5 : }
673 : :
674 : : /*
675 : : * Write the current schema of the relation and its ancestor (if any) if not
676 : : * done yet.
677 : : */
678 : : static void
2199 peter_e@gmx.net 679 : 182090 : maybe_send_schema(LogicalDecodingContext *ctx,
680 : : ReorderBufferChange *change,
681 : : Relation relation, RelationSyncEntry *relentry)
682 : : {
199 michael@paquier.xyz 683 :GNC 182090 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
684 : : bool schema_sent;
1319 akapila@postgresql.o 685 :CBC 182090 : TransactionId xid = InvalidTransactionId;
686 : 182090 : TransactionId topxid = InvalidTransactionId;
687 : :
688 : : /*
689 : : * Remember XID of the (sub)transaction for the change. We don't care if
690 : : * it's top-level transaction or not (we have already sent that XID in
691 : : * start of the current streaming block).
692 : : *
693 : : * If we're not in a streaming block, just use InvalidTransactionId and
694 : : * the write methods will not include it.
695 : : */
199 michael@paquier.xyz 696 [ + + ]:GNC 182090 : if (data->in_streaming)
1319 akapila@postgresql.o 697 :CBC 175907 : xid = change->txn->xid;
698 : :
394 699 [ + + ]: 182090 : if (rbtxn_is_subtxn(change->txn))
700 [ + - ]: 10169 : topxid = rbtxn_get_toptxn(change->txn)->xid;
701 : : else
1319 702 : 171921 : topxid = xid;
703 : :
704 : : /*
705 : : * Do we need to send the schema? We do track streamed transactions
706 : : * separately, because those may be applied later (and the regular
707 : : * transactions won't see their effects until then) and in an order that
708 : : * we don't know at this point.
709 : : *
710 : : * XXX There is a scope of optimization here. Currently, we always send
711 : : * the schema first time in a streaming transaction but we can probably
712 : : * avoid that by checking 'relentry->schema_sent' flag. However, before
713 : : * doing that we need to study its impact on the case where we have a mix
714 : : * of streaming and non-streaming transactions.
715 : : */
199 michael@paquier.xyz 716 [ + + ]:GNC 182090 : if (data->in_streaming)
1319 akapila@postgresql.o 717 :CBC 175907 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
718 : : else
719 : 6183 : schema_sent = relentry->schema_sent;
720 : :
721 : : /* Nothing to do if we already sent the schema. */
722 [ + + ]: 182090 : if (schema_sent)
1467 peter@eisentraut.org 723 : 181811 : return;
724 : :
725 : : /*
726 : : * Send the schema. If the changes will be published using an ancestor's
727 : : * schema, not the relation's own, send that ancestor's schema before
728 : : * sending relation's own (XXX - maybe sending only the former suffices?).
729 : : */
730 [ + + ]: 279 : if (relentry->publish_as_relid != RelationGetRelid(relation))
731 : : {
732 : 29 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
733 : :
750 tomas.vondra@postgre 734 : 29 : send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
1467 peter@eisentraut.org 735 : 29 : RelationClose(ancestor);
736 : : }
737 : :
750 tomas.vondra@postgre 738 : 279 : send_relation_and_attrs(relation, xid, ctx, relentry->columns);
739 : :
199 michael@paquier.xyz 740 [ + + ]:GNC 279 : if (data->in_streaming)
1319 akapila@postgresql.o 741 :CBC 66 : set_schema_sent_in_streamed_txn(relentry, topxid);
742 : : else
743 : 213 : relentry->schema_sent = true;
744 : : }
745 : :
746 : : /*
747 : : * Sends a relation
748 : : */
749 : : static void
750 : 308 : send_relation_and_attrs(Relation relation, TransactionId xid,
751 : : LogicalDecodingContext *ctx,
752 : : Bitmapset *columns)
753 : : {
1467 peter@eisentraut.org 754 : 308 : TupleDesc desc = RelationGetDescr(relation);
755 : : int i;
756 : :
757 : : /*
758 : : * Write out type info if needed. We do that only for user-created types.
759 : : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
760 : : * objects with hand-assigned OIDs to be "built in", not for instance any
761 : : * function or type defined in the information_schema. This is important
762 : : * because only hand-assigned OIDs can be expected to remain stable across
763 : : * major versions.
764 : : */
765 [ + + ]: 917 : for (i = 0; i < desc->natts; i++)
766 : : {
767 : 609 : Form_pg_attribute att = TupleDescAttr(desc, i);
768 : :
769 [ + + + + ]: 609 : if (att->attisdropped || att->attgenerated)
770 : 5 : continue;
771 : :
772 [ + + ]: 604 : if (att->atttypid < FirstGenbkiObjectId)
773 : 586 : continue;
774 : :
775 : : /* Skip this attribute if it's not present in the column list */
750 tomas.vondra@postgre 776 [ + + - + ]: 18 : if (columns != NULL && !bms_is_member(att->attnum, columns))
750 tomas.vondra@postgre 777 :UBC 0 : continue;
778 : :
2199 peter_e@gmx.net 779 :CBC 18 : OutputPluginPrepareWrite(ctx, false);
1319 akapila@postgresql.o 780 : 18 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
2199 peter_e@gmx.net 781 : 18 : OutputPluginWrite(ctx, false);
782 : : }
783 : :
1467 peter@eisentraut.org 784 : 308 : OutputPluginPrepareWrite(ctx, false);
750 tomas.vondra@postgre 785 : 308 : logicalrep_write_rel(ctx->out, xid, relation, columns);
1467 peter@eisentraut.org 786 : 308 : OutputPluginWrite(ctx, false);
2199 peter_e@gmx.net 787 : 308 : }
788 : :
789 : : /*
790 : : * Executor state preparation for evaluation of row filter expressions for the
791 : : * specified relation.
792 : : */
793 : : static EState *
782 akapila@postgresql.o 794 : 16 : create_estate_for_relation(Relation rel)
795 : : {
796 : : EState *estate;
797 : : RangeTblEntry *rte;
405 tgl@sss.pgh.pa.us 798 : 16 : List *perminfos = NIL;
799 : :
782 akapila@postgresql.o 800 : 16 : estate = CreateExecutorState();
801 : :
802 : 16 : rte = makeNode(RangeTblEntry);
803 : 16 : rte->rtekind = RTE_RELATION;
804 : 16 : rte->relid = RelationGetRelid(rel);
805 : 16 : rte->relkind = rel->rd_rel->relkind;
806 : 16 : rte->rellockmode = AccessShareLock;
807 : :
405 tgl@sss.pgh.pa.us 808 : 16 : addRTEPermissionInfo(&perminfos, rte);
809 : :
810 : 16 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
811 : :
782 akapila@postgresql.o 812 : 16 : estate->es_output_cid = GetCurrentCommandId(false);
813 : :
814 : 16 : return estate;
815 : : }
816 : :
817 : : /*
818 : : * Evaluates row filter.
819 : : *
820 : : * If the row filter evaluates to NULL, it is taken as false i.e. the change
821 : : * isn't replicated.
822 : : */
823 : : static bool
824 : 36 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
825 : : {
826 : : Datum ret;
827 : : bool isnull;
828 : :
829 [ - + ]: 36 : Assert(state != NULL);
830 : :
831 : 36 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
832 : :
833 [ - + - - : 36 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
- - - - ]
834 : : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
835 : : isnull ? "true" : "false");
836 : :
837 [ + + ]: 36 : if (isnull)
838 : 1 : return false;
839 : :
840 : 35 : return DatumGetBool(ret);
841 : : }
842 : :
843 : : /*
844 : : * Make sure the per-entry memory context exists.
845 : : */
846 : : static void
750 tomas.vondra@postgre 847 : 53 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
848 : : {
849 : : Relation relation;
850 : :
851 : : /* The context may already exist, in which case bail out. */
852 [ + + ]: 53 : if (entry->entry_cxt)
853 : 2 : return;
854 : :
855 : 51 : relation = RelationIdGetRelation(entry->publish_as_relid);
856 : :
857 : 51 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
858 : : "entry private context",
859 : : ALLOCSET_SMALL_SIZES);
860 : :
861 : 51 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
862 : : RelationGetRelationName(relation));
863 : : }
864 : :
865 : : /*
866 : : * Initialize the row filter.
867 : : */
868 : : static void
782 akapila@postgresql.o 869 : 243 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
870 : : RelationSyncEntry *entry)
871 : : {
872 : : ListCell *lc;
873 : 243 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
874 : 243 : bool no_filter[] = {false, false, false}; /* One per pubaction */
875 : : MemoryContext oldctx;
876 : : int idx;
877 : 243 : bool has_filter = true;
569 878 : 243 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
879 : :
880 : : /*
881 : : * Find if there are any row filters for this relation. If there are, then
882 : : * prepare the necessary ExprState and cache it in entry->exprstate. To
883 : : * build an expression state, we need to ensure the following:
884 : : *
885 : : * All the given publication-table mappings must be checked.
886 : : *
887 : : * Multiple publications might have multiple row filters for this
888 : : * relation. Since row filter usage depends on the DML operation, there
889 : : * are multiple lists (one for each operation) to which row filters will
890 : : * be appended.
891 : : *
892 : : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
893 : : * expression" so it takes precedence.
894 : : */
782 895 [ + - + + : 263 : foreach(lc, publications)
+ + ]
896 : : {
897 : 247 : Publication *pub = lfirst(lc);
898 : 247 : HeapTuple rftuple = NULL;
899 : 247 : Datum rfdatum = 0;
569 900 : 247 : bool pub_no_filter = true;
901 : :
902 : : /*
903 : : * If the publication is FOR ALL TABLES, or the publication includes a
904 : : * FOR TABLES IN SCHEMA where the table belongs to the referred
905 : : * schema, then it is treated the same as if there are no row filters
906 : : * (even if other publications have a row filter).
907 : : */
908 [ + + ]: 247 : if (!pub->alltables &&
909 [ + + ]: 177 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
910 : : ObjectIdGetDatum(schemaid),
911 : : ObjectIdGetDatum(pub->oid)))
912 : : {
913 : : /*
914 : : * Check for the presence of a row filter in this publication.
915 : : */
782 916 : 171 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
917 : : ObjectIdGetDatum(entry->publish_as_relid),
918 : : ObjectIdGetDatum(pub->oid));
919 : :
920 [ + + ]: 171 : if (HeapTupleIsValid(rftuple))
921 : : {
922 : : /* Null indicates no filter. */
923 : 159 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
924 : : Anum_pg_publication_rel_prqual,
925 : : &pub_no_filter);
926 : : }
927 : : }
928 : :
929 [ + + ]: 247 : if (pub_no_filter)
930 : : {
931 [ + + ]: 234 : if (rftuple)
932 : 146 : ReleaseSysCache(rftuple);
933 : :
934 : 234 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
935 : 234 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
936 : 234 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
937 : :
938 : : /*
939 : : * Quick exit if all the DML actions are publicized via this
940 : : * publication.
941 : : */
942 [ + - ]: 234 : if (no_filter[PUBACTION_INSERT] &&
943 [ + + ]: 234 : no_filter[PUBACTION_UPDATE] &&
944 [ + - ]: 227 : no_filter[PUBACTION_DELETE])
945 : : {
946 : 227 : has_filter = false;
947 : 227 : break;
948 : : }
949 : :
950 : : /* No additional work for this publication. Next one. */
951 : 7 : continue;
952 : : }
953 : :
954 : : /* Form the per pubaction row filter lists. */
955 [ + - + - ]: 13 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
956 : 13 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
957 : 13 : TextDatumGetCString(rfdatum));
958 [ + - + - ]: 13 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
959 : 13 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
960 : 13 : TextDatumGetCString(rfdatum));
961 [ + - + - ]: 13 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
962 : 13 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
963 : 13 : TextDatumGetCString(rfdatum));
964 : :
965 : 13 : ReleaseSysCache(rftuple);
966 : : } /* loop all subscribed publications */
967 : :
968 : : /* Clean the row filter */
969 [ + + ]: 972 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
970 : : {
971 [ + + ]: 729 : if (no_filter[idx])
972 : : {
973 : 690 : list_free_deep(rfnodes[idx]);
974 : 690 : rfnodes[idx] = NIL;
975 : : }
976 : : }
977 : :
978 [ + + ]: 243 : if (has_filter)
979 : : {
980 : 16 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
981 : :
750 tomas.vondra@postgre 982 : 16 : pgoutput_ensure_entry_cxt(data, entry);
983 : :
984 : : /*
985 : : * Now all the filters for all pubactions are known. Combine them when
986 : : * their pubactions are the same.
987 : : */
988 : 16 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
782 akapila@postgresql.o 989 : 16 : entry->estate = create_estate_for_relation(relation);
990 [ + + ]: 64 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
991 : : {
992 : 48 : List *filters = NIL;
993 : : Expr *rfnode;
994 : :
995 [ + + ]: 48 : if (rfnodes[idx] == NIL)
996 : 21 : continue;
997 : :
998 [ + - + + : 57 : foreach(lc, rfnodes[idx])
+ + ]
999 : 30 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1000 : :
1001 : : /* combine the row filter and cache the ExprState */
1002 : 27 : rfnode = make_orclause(filters);
1003 : 27 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1004 : : } /* for each pubaction */
1005 : 16 : MemoryContextSwitchTo(oldctx);
1006 : :
1007 : 16 : RelationClose(relation);
1008 : : }
1009 : 243 : }
1010 : :
1011 : : /*
1012 : : * Initialize the column list.
1013 : : */
1014 : : static void
750 tomas.vondra@postgre 1015 : 243 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1016 : : RelationSyncEntry *entry)
1017 : : {
1018 : : ListCell *lc;
682 akapila@postgresql.o 1019 : 243 : bool first = true;
1020 : 243 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1021 : :
1022 : : /*
1023 : : * Find if there are any column lists for this relation. If there are,
1024 : : * build a bitmap using the column lists.
1025 : : *
1026 : : * Multiple publications might have multiple column lists for this
1027 : : * relation.
1028 : : *
1029 : : * Note that we don't support the case where the column list is different
1030 : : * for the same table when combining publications. See comments atop
1031 : : * fetch_table_list. But one can later change the publication so we still
1032 : : * need to check all the given publication-table mappings and report an
1033 : : * error if any publications have a different column list.
1034 : : *
1035 : : * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1036 : : */
750 tomas.vondra@postgre 1037 [ + - + + : 496 : foreach(lc, publications)
+ + ]
1038 : : {
1039 : 254 : Publication *pub = lfirst(lc);
1040 : 254 : HeapTuple cftuple = NULL;
1041 : 254 : Datum cfdatum = 0;
682 akapila@postgresql.o 1042 : 254 : Bitmapset *cols = NULL;
1043 : :
1044 : : /*
1045 : : * If the publication is FOR ALL TABLES then it is treated the same as
1046 : : * if there are no column lists (even if other publications have a
1047 : : * list).
1048 : : */
750 tomas.vondra@postgre 1049 [ + + ]: 254 : if (!pub->alltables)
1050 : : {
682 akapila@postgresql.o 1051 : 183 : bool pub_no_list = true;
1052 : :
1053 : : /*
1054 : : * Check for the presence of a column list in this publication.
1055 : : *
1056 : : * Note: If we find no pg_publication_rel row, it's a publication
1057 : : * defined for a whole schema, so it can't have a column list,
1058 : : * just like a FOR ALL TABLES publication.
1059 : : */
750 tomas.vondra@postgre 1060 : 183 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1061 : : ObjectIdGetDatum(entry->publish_as_relid),
1062 : : ObjectIdGetDatum(pub->oid));
1063 : :
1064 [ + + ]: 183 : if (HeapTupleIsValid(cftuple))
1065 : : {
1066 : : /* Lookup the column list attribute. */
1067 : 164 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1068 : : Anum_pg_publication_rel_prattrs,
1069 : : &pub_no_list);
1070 : :
1071 : : /* Build the column list bitmap in the per-entry context. */
1072 [ + + ]: 164 : if (!pub_no_list) /* when not null */
1073 : : {
1074 : : int i;
457 akapila@postgresql.o 1075 : 37 : int nliveatts = 0;
1076 : 37 : TupleDesc desc = RelationGetDescr(relation);
1077 : :
750 tomas.vondra@postgre 1078 : 37 : pgoutput_ensure_entry_cxt(data, entry);
1079 : :
682 akapila@postgresql.o 1080 : 37 : cols = pub_collist_to_bitmapset(cols, cfdatum,
1081 : : entry->entry_cxt);
1082 : :
1083 : : /* Get the number of live attributes. */
457 1084 [ + + ]: 155 : for (i = 0; i < desc->natts; i++)
1085 : : {
1086 : 118 : Form_pg_attribute att = TupleDescAttr(desc, i);
1087 : :
1088 [ + + + + ]: 118 : if (att->attisdropped || att->attgenerated)
1089 : 2 : continue;
1090 : :
1091 : 116 : nliveatts++;
1092 : : }
1093 : :
1094 : : /*
1095 : : * If column list includes all the columns of the table,
1096 : : * set it to NULL.
1097 : : */
1098 [ + + ]: 37 : if (bms_num_members(cols) == nliveatts)
1099 : : {
682 1100 : 7 : bms_free(cols);
1101 : 7 : cols = NULL;
1102 : : }
1103 : : }
1104 : :
1105 : 164 : ReleaseSysCache(cftuple);
1106 : : }
1107 : : }
1108 : :
1109 [ + + ]: 254 : if (first)
1110 : : {
1111 : 243 : entry->columns = cols;
1112 : 243 : first = false;
1113 : : }
1114 [ + + ]: 11 : else if (!bms_equal(entry->columns, cols))
1115 [ + - ]: 1 : ereport(ERROR,
1116 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1117 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1118 : : get_namespace_name(RelationGetNamespace(relation)),
1119 : : RelationGetRelationName(relation)));
1120 : : } /* loop all subscribed publications */
1121 : :
1122 : 242 : RelationClose(relation);
750 tomas.vondra@postgre 1123 : 242 : }
1124 : :
1125 : : /*
1126 : : * Initialize the slot for storing new and old tuples, and build the map that
1127 : : * will be used to convert the relation's tuples into the ancestor's format.
1128 : : */
1129 : : static void
782 akapila@postgresql.o 1130 : 243 : init_tuple_slot(PGOutputData *data, Relation relation,
1131 : : RelationSyncEntry *entry)
1132 : : {
1133 : : MemoryContext oldctx;
1134 : : TupleDesc oldtupdesc;
1135 : : TupleDesc newtupdesc;
1136 : :
1137 : 243 : oldctx = MemoryContextSwitchTo(data->cachectx);
1138 : :
1139 : : /*
1140 : : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1141 : : * live as long as the cache remains.
1142 : : */
139 1143 : 243 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1144 : 243 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1145 : :
782 1146 : 243 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1147 : 243 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1148 : :
1149 : 243 : MemoryContextSwitchTo(oldctx);
1150 : :
1151 : : /*
1152 : : * Cache the map that will be used to convert the relation's tuples into
1153 : : * the ancestor's format, if needed.
1154 : : */
1155 [ + + ]: 243 : if (entry->publish_as_relid != RelationGetRelid(relation))
1156 : : {
1157 : 30 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1158 : 30 : TupleDesc indesc = RelationGetDescr(relation);
1159 : 30 : TupleDesc outdesc = RelationGetDescr(ancestor);
1160 : :
1161 : : /* Map must live as long as the session does. */
1162 : 30 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1163 : :
502 alvherre@alvh.no-ip. 1164 : 30 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1165 : :
782 akapila@postgresql.o 1166 : 30 : MemoryContextSwitchTo(oldctx);
1167 : 30 : RelationClose(ancestor);
1168 : : }
1169 : 243 : }
1170 : :
1171 : : /*
1172 : : * Change is checked against the row filter if any.
1173 : : *
1174 : : * Returns true if the change is to be replicated, else false.
1175 : : *
1176 : : * For inserts, evaluate the row filter for new tuple.
1177 : : * For deletes, evaluate the row filter for old tuple.
1178 : : * For updates, evaluate the row filter for old and new tuple.
1179 : : *
1180 : : * For updates, if both evaluations are true, we allow sending the UPDATE and
1181 : : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1182 : : * only one of the tuples matches the row filter expression, we transform
1183 : : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1184 : : * following rules:
1185 : : *
1186 : : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1187 : : * Case 2: old-row (no match) new row (match) -> INSERT
1188 : : * Case 3: old-row (match) new-row (no match) -> DELETE
1189 : : * Case 4: old-row (match) new row (match) -> UPDATE
1190 : : *
1191 : : * The new action is updated in the action parameter.
1192 : : *
1193 : : * The new slot could be updated when transforming the UPDATE into INSERT,
1194 : : * because the original new tuple might not have column values from the replica
1195 : : * identity.
1196 : : *
1197 : : * Examples:
1198 : : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1199 : : * Since the old tuple satisfies, the initial table synchronization copied this
1200 : : * row (or another method was used to guarantee that there is data
1201 : : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1202 : : * row filter, so from a data consistency perspective, that row should be
1203 : : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1204 : : * statement and be sent to the subscriber. Keeping this row on the subscriber
1205 : : * is undesirable because it doesn't reflect what was defined in the row filter
1206 : : * expression on the publisher. This row on the subscriber would likely not be
1207 : : * modified by replication again. If someone inserted a new row with the same
1208 : : * old identifier, replication could stop due to a constraint violation.
1209 : : *
1210 : : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1211 : : * Since the old tuple doesn't satisfy, the initial table synchronization
1212 : : * probably didn't copy this row. However, after the UPDATE the new tuple does
1213 : : * satisfy the row filter, so from a data consistency perspective, that row
1214 : : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1215 : : * statements have no effect (it matches no row -- see
1216 : : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1217 : : * INSERT statement and be sent to the subscriber. However, this might surprise
1218 : : * someone who expects the data set to satisfy the row filter expression on the
1219 : : * provider.
1220 : : */
1221 : : static bool
1222 : 182088 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1223 : : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1224 : : ReorderBufferChangeType *action)
1225 : : {
1226 : : TupleDesc desc;
1227 : : int i;
1228 : : bool old_matched,
1229 : : new_matched,
1230 : : result;
1231 : : TupleTableSlot *tmp_new_slot;
1232 : 182088 : TupleTableSlot *new_slot = *new_slot_ptr;
1233 : : ExprContext *ecxt;
1234 : : ExprState *filter_exprstate;
1235 : :
1236 : : /*
1237 : : * We need this map to avoid relying on ReorderBufferChangeType enums
1238 : : * having specific values.
1239 : : */
1240 : : static const int map_changetype_pubaction[] = {
1241 : : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1242 : : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1243 : : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1244 : : };
1245 : :
1246 [ + + + + : 182088 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
- + ]
1247 : : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1248 : : *action == REORDER_BUFFER_CHANGE_DELETE);
1249 : :
1250 [ + + - + ]: 182088 : Assert(new_slot || old_slot);
1251 : :
1252 : : /* Get the corresponding row filter */
1253 : 182088 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1254 : :
1255 : : /* Bail out if there is no row filter */
1256 [ + + ]: 182088 : if (!filter_exprstate)
1257 : 182056 : return true;
1258 : :
1259 [ - + ]: 32 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1260 : : get_namespace_name(RelationGetNamespace(relation)),
1261 : : RelationGetRelationName(relation));
1262 : :
1263 [ + + ]: 32 : ResetPerTupleExprContext(entry->estate);
1264 : :
1265 [ + + ]: 32 : ecxt = GetPerTupleExprContext(entry->estate);
1266 : :
1267 : : /*
1268 : : * For the following occasions where there is only one tuple, we can
1269 : : * evaluate the row filter for that tuple and return.
1270 : : *
1271 : : * For inserts, we only have the new tuple.
1272 : : *
1273 : : * For updates, we can have only a new tuple when none of the replica
1274 : : * identity columns changed and none of those columns have external data
1275 : : * but we still need to evaluate the row filter for the new tuple as the
1276 : : * existing values of those columns might not match the filter. Also,
1277 : : * users can use constant expressions in the row filter, so we anyway need
1278 : : * to evaluate it for the new tuple.
1279 : : *
1280 : : * For deletes, we only have the old tuple.
1281 : : */
1282 [ + + + + ]: 32 : if (!new_slot || !old_slot)
1283 : : {
1284 [ + + ]: 28 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1285 : 28 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1286 : :
1287 : 28 : return result;
1288 : : }
1289 : :
1290 : : /*
1291 : : * Both the old and new tuples must be valid only for updates and need to
1292 : : * be checked against the row filter.
1293 : : */
1294 [ - + ]: 4 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1295 : :
1296 : 4 : slot_getallattrs(new_slot);
1297 : 4 : slot_getallattrs(old_slot);
1298 : :
1299 : 4 : tmp_new_slot = NULL;
1300 : 4 : desc = RelationGetDescr(relation);
1301 : :
1302 : : /*
1303 : : * The new tuple might not have all the replica identity columns, in which
1304 : : * case it needs to be copied over from the old tuple.
1305 : : */
1306 [ + + ]: 12 : for (i = 0; i < desc->natts; i++)
1307 : : {
1308 : 8 : Form_pg_attribute att = TupleDescAttr(desc, i);
1309 : :
1310 : : /*
1311 : : * if the column in the new tuple or old tuple is null, nothing to do
1312 : : */
1313 [ + + - + ]: 8 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1314 : 1 : continue;
1315 : :
1316 : : /*
1317 : : * Unchanged toasted replica identity columns are only logged in the
1318 : : * old tuple. Copy this over to the new tuple. The changed (or WAL
1319 : : * Logged) toast values are always assembled in memory and set as
1320 : : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1321 : : */
1322 [ + + ]: 7 : if (att->attlen == -1 &&
1323 [ + + + - ]: 4 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1324 [ - + - - ]: 1 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1325 : : {
1326 [ + - ]: 1 : if (!tmp_new_slot)
1327 : : {
1328 : 1 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1329 : 1 : ExecClearTuple(tmp_new_slot);
1330 : :
1331 : 1 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1332 : 1 : desc->natts * sizeof(Datum));
1333 : 1 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1334 : 1 : desc->natts * sizeof(bool));
1335 : : }
1336 : :
1337 : 1 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1338 : 1 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1339 : : }
1340 : : }
1341 : :
1342 : 4 : ecxt->ecxt_scantuple = old_slot;
1343 : 4 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1344 : :
1345 [ + + ]: 4 : if (tmp_new_slot)
1346 : : {
1347 : 1 : ExecStoreVirtualTuple(tmp_new_slot);
1348 : 1 : ecxt->ecxt_scantuple = tmp_new_slot;
1349 : : }
1350 : : else
1351 : 3 : ecxt->ecxt_scantuple = new_slot;
1352 : :
1353 : 4 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1354 : :
1355 : : /*
1356 : : * Case 1: if both tuples don't match the row filter, bailout. Send
1357 : : * nothing.
1358 : : */
1359 [ + + - + ]: 4 : if (!old_matched && !new_matched)
782 akapila@postgresql.o 1360 :UBC 0 : return false;
1361 : :
1362 : : /*
1363 : : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1364 : : * tuple does, transform the UPDATE into INSERT.
1365 : : *
1366 : : * Use the newly transformed tuple that must contain the column values for
1367 : : * all the replica identity columns. This is required to ensure that the
1368 : : * while inserting the tuple in the downstream node, we have all the
1369 : : * required column values.
1370 : : */
782 akapila@postgresql.o 1371 [ + + + - ]:CBC 4 : if (!old_matched && new_matched)
1372 : : {
1373 : 2 : *action = REORDER_BUFFER_CHANGE_INSERT;
1374 : :
1375 [ + + ]: 2 : if (tmp_new_slot)
1376 : 1 : *new_slot_ptr = tmp_new_slot;
1377 : : }
1378 : :
1379 : : /*
1380 : : * Case 3: if the old tuple satisfies the row filter but the new tuple
1381 : : * doesn't, transform the UPDATE into DELETE.
1382 : : *
1383 : : * This transformation does not require another tuple. The Old tuple will
1384 : : * be used for DELETE.
1385 : : */
1386 [ + - + + ]: 2 : else if (old_matched && !new_matched)
1387 : 1 : *action = REORDER_BUFFER_CHANGE_DELETE;
1388 : :
1389 : : /*
1390 : : * Case 4: if both tuples match the row filter, transformation isn't
1391 : : * required. (*action is default UPDATE).
1392 : : */
1393 : :
1394 : 4 : return true;
1395 : : }
1396 : :
1397 : : /*
1398 : : * Sends the decoded DML over wire.
1399 : : *
1400 : : * This is called both in streaming and non-streaming modes.
1401 : : */
1402 : : static void
2642 peter_e@gmx.net 1403 : 186213 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1404 : : Relation relation, ReorderBufferChange *change)
1405 : : {
2524 bruce@momjian.us 1406 : 186213 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
746 akapila@postgresql.o 1407 : 186213 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1408 : : MemoryContext old;
1409 : : RelationSyncEntry *relentry;
1319 1410 : 186213 : TransactionId xid = InvalidTransactionId;
1188 1411 : 186213 : Relation ancestor = NULL;
782 1412 : 186213 : Relation targetrel = relation;
1413 : 186213 : ReorderBufferChangeType action = change->action;
1414 : 186213 : TupleTableSlot *old_slot = NULL;
1415 : 186213 : TupleTableSlot *new_slot = NULL;
1416 : :
2242 peter_e@gmx.net 1417 [ + + ]: 186213 : if (!is_publishable_relation(relation))
1418 : 4122 : return;
1419 : :
1420 : : /*
1421 : : * Remember the xid for the change in streaming mode. We need to send xid
1422 : : * with each change in the streaming mode so that subscriber can make
1423 : : * their association and on aborts, it can discard the corresponding
1424 : : * changes.
1425 : : */
199 michael@paquier.xyz 1426 [ + + ]:GNC 186211 : if (data->in_streaming)
1319 akapila@postgresql.o 1427 :CBC 175907 : xid = change->txn->xid;
1428 : :
782 1429 : 186211 : relentry = get_rel_sync_entry(data, relation);
1430 : :
1431 : : /* First check the table filter */
1432 [ + + + - ]: 186208 : switch (action)
1433 : : {
2642 peter_e@gmx.net 1434 : 108868 : case REORDER_BUFFER_CHANGE_INSERT:
1435 [ + + ]: 108868 : if (!relentry->pubactions.pubinsert)
1436 : 3043 : return;
1437 : 105825 : break;
1438 : 34455 : case REORDER_BUFFER_CHANGE_UPDATE:
1439 [ + + ]: 34455 : if (!relentry->pubactions.pubupdate)
1440 : 43 : return;
1441 : 34412 : break;
1442 : 42885 : case REORDER_BUFFER_CHANGE_DELETE:
1443 [ + + ]: 42885 : if (!relentry->pubactions.pubdelete)
1444 : 1034 : return;
1445 : :
1446 : : /*
1447 : : * This is only possible if deletes are allowed even when replica
1448 : : * identity is not defined for a table. Since the DELETE action
1449 : : * can't be published, we simply return.
1450 : : */
381 akapila@postgresql.o 1451 [ - + ]: 41851 : if (!change->data.tp.oldtuple)
1452 : : {
381 akapila@postgresql.o 1453 [ # # ]:UBC 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1454 : 0 : return;
1455 : : }
2642 peter_e@gmx.net 1456 :CBC 41851 : break;
2642 peter_e@gmx.net 1457 :UBC 0 : default:
1458 : 0 : Assert(false);
1459 : : }
1460 : :
1461 : : /* Avoid leaking memory by using and resetting our own context */
2642 peter_e@gmx.net 1462 :CBC 182088 : old = MemoryContextSwitchTo(data->context);
1463 : :
1464 : : /* Switch relation if publishing via root. */
381 akapila@postgresql.o 1465 [ + + ]: 182088 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1466 : : {
1467 [ - + ]: 51 : Assert(relation->rd_rel->relispartition);
1468 : 51 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1469 : 51 : targetrel = ancestor;
1470 : : }
1471 : :
1472 [ + + ]: 182088 : if (change->data.tp.oldtuple)
1473 : : {
1474 : 41966 : old_slot = relentry->old_slot;
76 msawada@postgresql.o 1475 :GNC 41966 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1476 : :
1477 : : /* Convert tuple if needed. */
381 akapila@postgresql.o 1478 [ - + ]:CBC 41966 : if (relentry->attrmap)
1479 : : {
381 akapila@postgresql.o 1480 :UBC 0 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1481 : : &TTSOpsVirtual);
1482 : :
1483 : 0 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1484 : : }
1485 : : }
1486 : :
381 akapila@postgresql.o 1487 [ + + ]:CBC 182088 : if (change->data.tp.newtuple)
1488 : : {
1489 : 140237 : new_slot = relentry->new_slot;
76 msawada@postgresql.o 1490 :GNC 140237 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1491 : :
1492 : : /* Convert tuple if needed. */
381 akapila@postgresql.o 1493 [ + + ]:CBC 140237 : if (relentry->attrmap)
1494 : : {
1495 : 9 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1496 : : &TTSOpsVirtual);
1497 : :
1498 : 9 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1499 : : }
1500 : : }
1501 : :
1502 : : /*
1503 : : * Check row filter.
1504 : : *
1505 : : * Updates could be transformed to inserts or deletes based on the results
1506 : : * of the row filter for old and new tuple.
1507 : : */
1508 [ + + ]: 182088 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1509 : 11 : goto cleanup;
1510 : :
1511 : : /*
1512 : : * Send BEGIN if we haven't yet.
1513 : : *
1514 : : * We send the BEGIN message after ensuring that we will actually send the
1515 : : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1516 : : * transactions.
1517 : : */
1518 [ + + + + ]: 182077 : if (txndata && !txndata->sent_begin_txn)
1519 : 350 : pgoutput_send_begin(ctx, txn);
1520 : :
1521 : : /*
1522 : : * Schema should be sent using the original relation because it also sends
1523 : : * the ancestor's relation.
1524 : : */
1525 : 182077 : maybe_send_schema(ctx, change, relation, relentry);
1526 : :
1527 : 182077 : OutputPluginPrepareWrite(ctx, true);
1528 : :
1529 : : /* Send the data */
1530 [ + + + - ]: 182077 : switch (action)
1531 : : {
1532 : 105816 : case REORDER_BUFFER_CHANGE_INSERT:
1533 : 105816 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1534 : 105816 : data->binary, relentry->columns);
1535 : 105816 : break;
1536 : 34409 : case REORDER_BUFFER_CHANGE_UPDATE:
1537 : 34409 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1538 : 34409 : new_slot, data->binary, relentry->columns);
782 1539 : 34409 : break;
2642 peter_e@gmx.net 1540 : 41852 : case REORDER_BUFFER_CHANGE_DELETE:
381 akapila@postgresql.o 1541 : 41852 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1542 : 41852 : data->binary, relentry->columns);
2642 peter_e@gmx.net 1543 : 41852 : break;
2642 peter_e@gmx.net 1544 :UBC 0 : default:
1545 : 0 : Assert(false);
1546 : : }
1547 : :
381 akapila@postgresql.o 1548 :CBC 182077 : OutputPluginWrite(ctx, true);
1549 : :
1550 : 182088 : cleanup:
1188 1551 [ + + ]: 182088 : if (RelationIsValid(ancestor))
1552 : : {
1553 : 51 : RelationClose(ancestor);
1554 : 51 : ancestor = NULL;
1555 : : }
1556 : :
2642 peter_e@gmx.net 1557 : 182088 : MemoryContextSwitchTo(old);
1558 : 182088 : MemoryContextReset(data->context);
1559 : : }
1560 : :
1561 : : static void
2199 1562 : 12 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1563 : : int nrelations, Relation relations[], ReorderBufferChange *change)
1564 : : {
1565 : 12 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
746 akapila@postgresql.o 1566 : 12 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1567 : : MemoryContext old;
1568 : : RelationSyncEntry *relentry;
1569 : : int i;
1570 : : int nrelids;
1571 : : Oid *relids;
1319 1572 : 12 : TransactionId xid = InvalidTransactionId;
1573 : :
1574 : : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
199 michael@paquier.xyz 1575 [ - + ]:GNC 12 : if (data->in_streaming)
1319 akapila@postgresql.o 1576 :UBC 0 : xid = change->txn->xid;
1577 : :
2199 peter_e@gmx.net 1578 :CBC 12 : old = MemoryContextSwitchTo(data->context);
1579 : :
1580 : 12 : relids = palloc0(nrelations * sizeof(Oid));
1581 : 12 : nrelids = 0;
1582 : :
1583 [ + + ]: 33 : for (i = 0; i < nrelations; i++)
1584 : : {
1585 : 21 : Relation relation = relations[i];
1586 : 21 : Oid relid = RelationGetRelid(relation);
1587 : :
1588 [ - + ]: 21 : if (!is_publishable_relation(relation))
2199 peter_e@gmx.net 1589 :UBC 0 : continue;
1590 : :
782 akapila@postgresql.o 1591 :CBC 21 : relentry = get_rel_sync_entry(data, relation);
1592 : :
2199 peter_e@gmx.net 1593 [ + + ]: 21 : if (!relentry->pubactions.pubtruncate)
1594 : 8 : continue;
1595 : :
1596 : : /*
1597 : : * Don't send partitions if the publication wants to send only the
1598 : : * root tables through it.
1599 : : */
1467 peter@eisentraut.org 1600 [ + + ]: 13 : if (relation->rd_rel->relispartition &&
1601 [ - + ]: 10 : relentry->publish_as_relid != relid)
1496 peter@eisentraut.org 1602 :UBC 0 : continue;
1603 : :
2199 peter_e@gmx.net 1604 :CBC 13 : relids[nrelids++] = relid;
1605 : :
1606 : : /* Send BEGIN if we haven't yet */
746 akapila@postgresql.o 1607 [ + - + + ]: 13 : if (txndata && !txndata->sent_begin_txn)
1608 : 8 : pgoutput_send_begin(ctx, txn);
1609 : :
983 fujii@postgresql.org 1610 : 13 : maybe_send_schema(ctx, change, relation, relentry);
1611 : : }
1612 : :
2176 peter_e@gmx.net 1613 [ + + ]: 12 : if (nrelids > 0)
1614 : : {
1615 : 8 : OutputPluginPrepareWrite(ctx, true);
1616 : 8 : logicalrep_write_truncate(ctx->out,
1617 : : xid,
1618 : : nrelids,
1619 : : relids,
1620 : 8 : change->data.truncate.cascade,
1621 : 8 : change->data.truncate.restart_seqs);
1622 : 8 : OutputPluginWrite(ctx, true);
1623 : : }
1624 : :
2199 1625 : 12 : MemoryContextSwitchTo(old);
1626 : 12 : MemoryContextReset(data->context);
1627 : 12 : }
1628 : :
1629 : : static void
1104 akapila@postgresql.o 1630 : 7 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1631 : : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1632 : : const char *message)
1633 : : {
1634 : 7 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1635 : 7 : TransactionId xid = InvalidTransactionId;
1636 : :
1637 [ + + ]: 7 : if (!data->messages)
1638 : 2 : return;
1639 : :
1640 : : /*
1641 : : * Remember the xid for the message in streaming mode. See
1642 : : * pgoutput_change.
1643 : : */
199 michael@paquier.xyz 1644 [ - + ]:GNC 5 : if (data->in_streaming)
1104 akapila@postgresql.o 1645 :UBC 0 : xid = txn->xid;
1646 : :
1647 : : /*
1648 : : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1649 : : */
746 akapila@postgresql.o 1650 [ + + ]:CBC 5 : if (transactional)
1651 : : {
1652 : 2 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1653 : :
1654 : : /* Send BEGIN if we haven't yet */
1655 [ + - + - ]: 2 : if (txndata && !txndata->sent_begin_txn)
1656 : 2 : pgoutput_send_begin(ctx, txn);
1657 : : }
1658 : :
1104 1659 : 5 : OutputPluginPrepareWrite(ctx, true);
1660 : 5 : logicalrep_write_message(ctx->out,
1661 : : xid,
1662 : : message_lsn,
1663 : : transactional,
1664 : : prefix,
1665 : : sz,
1666 : : message);
1667 : 5 : OutputPluginWrite(ctx, true);
1668 : : }
1669 : :
1670 : : /*
1671 : : * Return true if the data is associated with an origin and the user has
1672 : : * requested the changes that don't have an origin, false otherwise.
1673 : : */
1674 : : static bool
2642 peter_e@gmx.net 1675 : 305867 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1676 : : RepOriginId origin_id)
1677 : : {
200 akapila@postgresql.o 1678 : 305867 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1679 : :
200 akapila@postgresql.o 1680 [ + + + + ]:GNC 305867 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
633 akapila@postgresql.o 1681 :CBC 44 : return true;
1682 : :
2642 peter_e@gmx.net 1683 : 305823 : return false;
1684 : : }
1685 : :
1686 : : /*
1687 : : * Shutdown the output plugin.
1688 : : *
1689 : : * Note, we don't need to clean the data->context and data->cachectx as
1690 : : * they are child contexts of the ctx->context so they will be cleaned up by
1691 : : * logical decoding machinery.
1692 : : */
1693 : : static void
2524 bruce@momjian.us 1694 : 444 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1695 : : {
2642 peter_e@gmx.net 1696 [ + + ]: 444 : if (RelationSyncCache)
1697 : : {
1698 : 168 : hash_destroy(RelationSyncCache);
1699 : 168 : RelationSyncCache = NULL;
1700 : : }
1701 : 444 : }
1702 : :
1703 : : /*
1704 : : * Load publications from the list of publication names.
1705 : : */
1706 : : static List *
1707 : 145 : LoadPublications(List *pubnames)
1708 : : {
1709 : 145 : List *result = NIL;
1710 : : ListCell *lc;
1711 : :
2524 bruce@momjian.us 1712 [ + - + + : 331 : foreach(lc, pubnames)
+ + ]
1713 : : {
1714 : 188 : char *pubname = (char *) lfirst(lc);
1715 : 188 : Publication *pub = GetPublicationByName(pubname, false);
1716 : :
2642 peter_e@gmx.net 1717 : 186 : result = lappend(result, pub);
1718 : : }
1719 : :
1720 : 143 : return result;
1721 : : }
1722 : :
1723 : : /*
1724 : : * Publication syscache invalidation callback.
1725 : : *
1726 : : * Called for invalidations on pg_publication.
1727 : : */
1728 : : static void
1729 : 228 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1730 : : {
1731 : 228 : publications_valid = false;
1732 : :
1733 : : /*
1734 : : * Also invalidate per-relation cache so that next time the filtering info
1735 : : * is checked it will be updated with the new publication settings.
1736 : : */
1737 : 228 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1738 : 228 : }
1739 : :
1740 : : /*
1741 : : * START STREAM callback
1742 : : */
1743 : : static void
1319 akapila@postgresql.o 1744 : 612 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1745 : : ReorderBufferTXN *txn)
1746 : : {
199 michael@paquier.xyz 1747 :GNC 612 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1319 akapila@postgresql.o 1748 :CBC 612 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1749 : :
1750 : : /* we can't nest streaming of transactions */
199 michael@paquier.xyz 1751 [ - + ]:GNC 612 : Assert(!data->in_streaming);
1752 : :
1753 : : /*
1754 : : * If we already sent the first stream for this transaction then don't
1755 : : * send the origin id in the subsequent streams.
1756 : : */
1319 akapila@postgresql.o 1757 [ + + ]:CBC 612 : if (rbtxn_is_streamed(txn))
1758 : 556 : send_replication_origin = false;
1759 : :
1760 : 612 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1761 : 612 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1762 : :
1005 1763 : 612 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1764 : : send_replication_origin);
1765 : :
1319 1766 : 612 : OutputPluginWrite(ctx, true);
1767 : :
1768 : : /* we're streaming a chunk of transaction now */
199 michael@paquier.xyz 1769 :GNC 612 : data->in_streaming = true;
1319 akapila@postgresql.o 1770 :CBC 612 : }
1771 : :
1772 : : /*
1773 : : * STOP STREAM callback
1774 : : */
1775 : : static void
1776 : 612 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1777 : : ReorderBufferTXN *txn)
1778 : : {
199 michael@paquier.xyz 1779 :GNC 612 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1780 : :
1781 : : /* we should be streaming a transaction */
1782 [ - + ]: 612 : Assert(data->in_streaming);
1783 : :
1319 akapila@postgresql.o 1784 :CBC 612 : OutputPluginPrepareWrite(ctx, true);
1785 : 612 : logicalrep_write_stream_stop(ctx->out);
1786 : 612 : OutputPluginWrite(ctx, true);
1787 : :
1788 : : /* we've stopped streaming a transaction */
199 michael@paquier.xyz 1789 :GNC 612 : data->in_streaming = false;
1319 akapila@postgresql.o 1790 :CBC 612 : }
1791 : :
1792 : : /*
1793 : : * Notify downstream to discard the streamed transaction (along with all
1794 : : * it's subtransactions, if it's a toplevel transaction).
1795 : : */
1796 : : static void
1797 : 26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1798 : : ReorderBufferTXN *txn,
1799 : : XLogRecPtr abort_lsn)
1800 : : {
1801 : : ReorderBufferTXN *toptxn;
461 1802 : 26 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1803 : 26 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1804 : :
1805 : : /*
1806 : : * The abort should happen outside streaming block, even for streamed
1807 : : * transactions. The transaction has to be marked as streamed, though.
1808 : : */
199 michael@paquier.xyz 1809 [ - + ]:GNC 26 : Assert(!data->in_streaming);
1810 : :
1811 : : /* determine the toplevel transaction */
394 akapila@postgresql.o 1812 [ + + ]:CBC 26 : toptxn = rbtxn_get_toptxn(txn);
1813 : :
1319 1814 [ - + ]: 26 : Assert(rbtxn_is_streamed(toptxn));
1815 : :
1816 : 26 : OutputPluginPrepareWrite(ctx, true);
461 1817 : 26 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1818 : : txn->xact_time.abort_time, write_abort_info);
1819 : :
1319 1820 : 26 : OutputPluginWrite(ctx, true);
1821 : :
1822 : 26 : cleanup_rel_sync_cache(toptxn->xid, false);
1823 : 26 : }
1824 : :
1825 : : /*
1826 : : * Notify downstream to apply the streamed transaction (along with all
1827 : : * it's subtransactions).
1828 : : */
1829 : : static void
1830 : 44 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1831 : : ReorderBufferTXN *txn,
1832 : : XLogRecPtr commit_lsn)
1833 : : {
199 michael@paquier.xyz 1834 :GNC 44 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1835 : :
1836 : : /*
1837 : : * The commit should happen outside streaming block, even for streamed
1838 : : * transactions. The transaction has to be marked as streamed, though.
1839 : : */
1840 [ - + ]: 44 : Assert(!data->in_streaming);
1319 akapila@postgresql.o 1841 [ - + ]:CBC 44 : Assert(rbtxn_is_streamed(txn));
1842 : :
431 1843 : 44 : OutputPluginUpdateProgress(ctx, false);
1844 : :
1319 1845 : 44 : OutputPluginPrepareWrite(ctx, true);
1846 : 44 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1847 : 44 : OutputPluginWrite(ctx, true);
1848 : :
1849 : 44 : cleanup_rel_sync_cache(txn->xid, true);
1850 : 44 : }
1851 : :
1852 : : /*
1853 : : * PREPARE callback (for streaming two-phase commit).
1854 : : *
1855 : : * Notify the downstream to prepare the transaction.
1856 : : */
1857 : : static void
984 1858 : 9 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1859 : : ReorderBufferTXN *txn,
1860 : : XLogRecPtr prepare_lsn)
1861 : : {
1862 [ - + ]: 9 : Assert(rbtxn_is_streamed(txn));
1863 : :
431 1864 : 9 : OutputPluginUpdateProgress(ctx, false);
984 1865 : 9 : OutputPluginPrepareWrite(ctx, true);
1866 : 9 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1867 : 9 : OutputPluginWrite(ctx, true);
1868 : 9 : }
1869 : :
1870 : : /*
1871 : : * Initialize the relation schema sync cache for a decoding session.
1872 : : *
1873 : : * The hash table is destroyed at the end of a decoding session. While
1874 : : * relcache invalidations still exist and will still be invoked, they
1875 : : * will just see the null hash table global and take no action.
1876 : : */
1877 : : static void
2642 peter_e@gmx.net 1878 : 318 : init_rel_sync_cache(MemoryContext cachectx)
1879 : : {
1880 : : HASHCTL ctl;
1881 : : static bool relation_callbacks_registered = false;
1882 : :
1883 : : /* Nothing to do if hash table already exists */
1884 [ - + ]: 318 : if (RelationSyncCache != NULL)
2642 peter_e@gmx.net 1885 :UBC 0 : return;
1886 : :
1887 : : /* Make a new hash table for the cache */
2642 peter_e@gmx.net 1888 :CBC 318 : ctl.keysize = sizeof(Oid);
1889 : 318 : ctl.entrysize = sizeof(RelationSyncEntry);
1890 : 318 : ctl.hcxt = cachectx;
1891 : :
1892 : 318 : RelationSyncCache = hash_create("logical replication output relation cache",
1893 : : 128, &ctl,
1894 : : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1895 : :
1896 [ - + ]: 318 : Assert(RelationSyncCache != NULL);
1897 : :
1898 : : /* No more to do if we already registered callbacks */
416 tgl@sss.pgh.pa.us 1899 [ + + ]: 318 : if (relation_callbacks_registered)
1900 : 1 : return;
1901 : :
1902 : : /* We must update the cache entry for a relation after a relcache flush */
2642 peter_e@gmx.net 1903 : 317 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1904 : :
1905 : : /*
1906 : : * Flush all cache entries after a pg_namespace change, in case it was a
1907 : : * schema rename affecting a relation being replicated.
1908 : : */
464 tgl@sss.pgh.pa.us 1909 : 317 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1910 : : rel_sync_cache_publication_cb,
1911 : : (Datum) 0);
1912 : :
1913 : : /*
1914 : : * Flush all cache entries after any publication changes. (We need no
1915 : : * callback entry for pg_publication, because publication_invalidation_cb
1916 : : * will take care of it.)
1917 : : */
2642 peter_e@gmx.net 1918 : 317 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1919 : : rel_sync_cache_publication_cb,
1920 : : (Datum) 0);
900 akapila@postgresql.o 1921 : 317 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1922 : : rel_sync_cache_publication_cb,
1923 : : (Datum) 0);
1924 : :
416 tgl@sss.pgh.pa.us 1925 : 317 : relation_callbacks_registered = true;
1926 : : }
1927 : :
1928 : : /*
1929 : : * We expect relatively small number of streamed transactions.
1930 : : */
1931 : : static bool
1319 akapila@postgresql.o 1932 : 175907 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1933 : : {
650 alvherre@alvh.no-ip. 1934 : 175907 : return list_member_xid(entry->streamed_txns, xid);
1935 : : }
1936 : :
1937 : : /*
1938 : : * Add the xid in the rel sync entry for which we have already sent the schema
1939 : : * of the relation.
1940 : : */
1941 : : static void
1319 akapila@postgresql.o 1942 : 66 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1943 : : {
1944 : : MemoryContext oldctx;
1945 : :
1946 : 66 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1947 : :
650 alvherre@alvh.no-ip. 1948 : 66 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1949 : :
1319 akapila@postgresql.o 1950 : 66 : MemoryContextSwitchTo(oldctx);
1951 : 66 : }
1952 : :
1953 : : /*
1954 : : * Find or create entry in the relation schema cache.
1955 : : *
1956 : : * This looks up publications that the given relation is directly or
1957 : : * indirectly part of (the latter if it's really the relation's ancestor that
1958 : : * is part of a publication) and fills up the found entry with the information
1959 : : * about which operations to publish and whether to use an ancestor's schema
1960 : : * when publishing.
1961 : : */
1962 : : static RelationSyncEntry *
782 1963 : 186232 : get_rel_sync_entry(PGOutputData *data, Relation relation)
1964 : : {
1965 : : RelationSyncEntry *entry;
1966 : : bool found;
1967 : : MemoryContext oldctx;
1968 : 186232 : Oid relid = RelationGetRelid(relation);
1969 : :
2642 peter_e@gmx.net 1970 [ - + ]: 186232 : Assert(RelationSyncCache != NULL);
1971 : :
1972 : : /* Find cached relation info, creating if not found */
1973 : 186232 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1974 : : &relid,
1975 : : HASH_ENTER, &found);
1976 [ - + ]: 186232 : Assert(entry != NULL);
1977 : :
1978 : : /* initialize entry, if it's new */
1306 akapila@postgresql.o 1979 [ + + ]: 186232 : if (!found)
1980 : : {
800 1981 : 227 : entry->replicate_valid = false;
1306 1982 : 227 : entry->schema_sent = false;
1983 : 227 : entry->streamed_txns = NIL;
1984 : 227 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
738 tomas.vondra@postgre 1985 : 227 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
782 akapila@postgresql.o 1986 : 227 : entry->new_slot = NULL;
1987 : 227 : entry->old_slot = NULL;
1988 : 227 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
750 tomas.vondra@postgre 1989 : 227 : entry->entry_cxt = NULL;
1306 akapila@postgresql.o 1990 : 227 : entry->publish_as_relid = InvalidOid;
750 tomas.vondra@postgre 1991 : 227 : entry->columns = NULL;
782 akapila@postgresql.o 1992 : 227 : entry->attrmap = NULL;
1993 : : }
1994 : :
1995 : : /* Validate the entry */
1306 1996 [ + + ]: 186232 : if (!entry->replicate_valid)
1997 : : {
900 1998 : 284 : Oid schemaId = get_rel_namespace(relid);
2642 peter_e@gmx.net 1999 : 284 : List *pubids = GetRelationPublications(relid);
2000 : :
2001 : : /*
2002 : : * We don't acquire a lock on the namespace system table as we build
2003 : : * the cache entry using a historic snapshot and all the later changes
2004 : : * are absorbed while decoding WAL.
2005 : : */
738 tomas.vondra@postgre 2006 : 284 : List *schemaPubids = GetSchemaPublications(schemaId);
2007 : : ListCell *lc;
1467 peter@eisentraut.org 2008 : 284 : Oid publish_as_relid = relid;
760 tomas.vondra@postgre 2009 : 284 : int publish_ancestor_level = 0;
830 michael@paquier.xyz 2010 : 284 : bool am_partition = get_rel_relispartition(relid);
738 tomas.vondra@postgre 2011 : 284 : char relkind = get_rel_relkind(relid);
782 akapila@postgresql.o 2012 : 284 : List *rel_publications = NIL;
2013 : :
2014 : : /* Reload publications if needed before use. */
2642 peter_e@gmx.net 2015 [ + + ]: 284 : if (!publications_valid)
2016 : : {
2017 : 145 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2018 [ + + ]: 145 : if (data->publications)
2019 : : {
2020 : 19 : list_free_deep(data->publications);
800 akapila@postgresql.o 2021 : 19 : data->publications = NIL;
2022 : : }
2642 peter_e@gmx.net 2023 : 145 : data->publications = LoadPublications(data->publication_names);
2024 : 143 : MemoryContextSwitchTo(oldctx);
2025 : 143 : publications_valid = true;
2026 : : }
2027 : :
2028 : : /*
2029 : : * Reset schema_sent status as the relation definition may have
2030 : : * changed. Also reset pubactions to empty in case rel was dropped
2031 : : * from a publication. Also free any objects that depended on the
2032 : : * earlier definition.
2033 : : */
800 akapila@postgresql.o 2034 : 282 : entry->schema_sent = false;
2035 : 282 : list_free(entry->streamed_txns);
2036 : 282 : entry->streamed_txns = NIL;
750 tomas.vondra@postgre 2037 : 282 : bms_free(entry->columns);
2038 : 282 : entry->columns = NULL;
800 akapila@postgresql.o 2039 : 282 : entry->pubactions.pubinsert = false;
2040 : 282 : entry->pubactions.pubupdate = false;
2041 : 282 : entry->pubactions.pubdelete = false;
2042 : 282 : entry->pubactions.pubtruncate = false;
2043 : :
2044 : : /*
2045 : : * Tuple slots cleanups. (Will be rebuilt later if needed).
2046 : : */
782 2047 [ + + ]: 282 : if (entry->old_slot)
2048 : 51 : ExecDropSingleTupleTableSlot(entry->old_slot);
2049 [ + + ]: 282 : if (entry->new_slot)
2050 : 51 : ExecDropSingleTupleTableSlot(entry->new_slot);
2051 : :
2052 : 282 : entry->old_slot = NULL;
2053 : 282 : entry->new_slot = NULL;
2054 : :
2055 [ - + ]: 282 : if (entry->attrmap)
782 akapila@postgresql.o 2056 :UBC 0 : free_attrmap(entry->attrmap);
782 akapila@postgresql.o 2057 :CBC 282 : entry->attrmap = NULL;
2058 : :
2059 : : /*
2060 : : * Row filter cache cleanups.
2061 : : */
750 tomas.vondra@postgre 2062 [ + + ]: 282 : if (entry->entry_cxt)
2063 : 10 : MemoryContextDelete(entry->entry_cxt);
2064 : :
2065 : 282 : entry->entry_cxt = NULL;
782 akapila@postgresql.o 2066 : 282 : entry->estate = NULL;
2067 : 282 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2068 : :
2069 : : /*
2070 : : * Build publication cache. We can't use one provided by relcache as
2071 : : * relcache considers all publications that the given relation is in,
2072 : : * but here we only need to consider ones that the subscriber
2073 : : * requested.
2074 : : */
2642 peter_e@gmx.net 2075 [ + - + + : 712 : foreach(lc, data->publications)
+ + ]
2076 : : {
2077 : 430 : Publication *pub = lfirst(lc);
1467 peter@eisentraut.org 2078 : 430 : bool publish = false;
2079 : :
2080 : : /*
2081 : : * Under what relid should we publish changes in this publication?
2082 : : * We'll use the top-most relid across all publications. Also
2083 : : * track the ancestor level for this publication.
2084 : : */
703 tgl@sss.pgh.pa.us 2085 : 430 : Oid pub_relid = relid;
2086 : 430 : int ancestor_level = 0;
2087 : :
2088 : : /*
2089 : : * If this is a FOR ALL TABLES publication, pick the partition
2090 : : * root and set the ancestor level accordingly.
2091 : : */
738 tomas.vondra@postgre 2092 [ + + ]: 430 : if (pub->alltables)
2093 : : {
1467 peter@eisentraut.org 2094 : 72 : publish = true;
2095 [ + + + - ]: 72 : if (pub->pubviaroot && am_partition)
2096 : : {
760 tomas.vondra@postgre 2097 : 14 : List *ancestors = get_partition_ancestors(relid);
2098 : :
2099 : 14 : pub_relid = llast_oid(ancestors);
2100 : 14 : ancestor_level = list_length(ancestors);
2101 : : }
2102 : : }
2103 : :
1467 peter@eisentraut.org 2104 [ + + ]: 430 : if (!publish)
2105 : : {
1431 tgl@sss.pgh.pa.us 2106 : 358 : bool ancestor_published = false;
2107 : :
2108 : : /*
2109 : : * For a partition, check if any of the ancestors are
2110 : : * published. If so, note down the topmost ancestor that is
2111 : : * published via this publication, which will be used as the
2112 : : * relation via which to publish the partition's changes.
2113 : : */
1467 peter@eisentraut.org 2114 [ + + ]: 358 : if (am_partition)
2115 : : {
2116 : : Oid ancestor;
2117 : : int level;
1431 tgl@sss.pgh.pa.us 2118 : 99 : List *ancestors = get_partition_ancestors(relid);
2119 : :
782 akapila@postgresql.o 2120 : 99 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2121 : : ancestors,
2122 : : &level);
2123 : :
2124 [ + + ]: 99 : if (ancestor != InvalidOid)
2125 : : {
2126 : 40 : ancestor_published = true;
2127 [ + + ]: 40 : if (pub->pubviaroot)
2128 : : {
760 tomas.vondra@postgre 2129 : 17 : pub_relid = ancestor;
2130 : 17 : ancestor_level = level;
2131 : : }
2132 : : }
2133 : : }
2134 : :
900 akapila@postgresql.o 2135 [ + + + + ]: 559 : if (list_member_oid(pubids, pub->oid) ||
2136 [ + + ]: 396 : list_member_oid(schemaPubids, pub->oid) ||
2137 : : ancestor_published)
1467 peter@eisentraut.org 2138 : 188 : publish = true;
2139 : : }
2140 : :
2141 : : /*
2142 : : * If the relation is to be published, determine actions to
2143 : : * publish, and list of columns, if appropriate.
2144 : : *
2145 : : * Don't publish changes for partitioned tables, because
2146 : : * publishing those of its partitions suffices, unless partition
2147 : : * changes won't be published due to pubviaroot being set.
2148 : : */
2149 [ + + + + ]: 430 : if (publish &&
2150 [ - + ]: 3 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2151 : : {
2642 peter_e@gmx.net 2152 : 257 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2153 : 257 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2154 : 257 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2199 2155 : 257 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2156 : :
2157 : : /*
2158 : : * We want to publish the changes as the top-most ancestor
2159 : : * across all publications. So we need to check if the already
2160 : : * calculated level is higher than the new one. If yes, we can
2161 : : * ignore the new value (as it's a child). Otherwise the new
2162 : : * value is an ancestor, so we keep it.
2163 : : */
760 tomas.vondra@postgre 2164 [ + + ]: 257 : if (publish_ancestor_level > ancestor_level)
2165 : 1 : continue;
2166 : :
2167 : : /*
2168 : : * If we found an ancestor higher up in the tree, discard the
2169 : : * list of publications through which we replicate it, and use
2170 : : * the new ancestor.
2171 : : */
759 2172 [ + + ]: 256 : if (publish_ancestor_level < ancestor_level)
2173 : : {
2174 : 31 : publish_as_relid = pub_relid;
2175 : 31 : publish_ancestor_level = ancestor_level;
2176 : :
2177 : : /* reset the publication list for this relation */
2178 : 31 : rel_publications = NIL;
2179 : : }
2180 : : else
2181 : : {
2182 : : /* Same ancestor level, has to be the same OID. */
2183 [ - + ]: 225 : Assert(publish_as_relid == pub_relid);
2184 : : }
2185 : :
2186 : : /* Track publications for this ancestor. */
2187 : 256 : rel_publications = lappend(rel_publications, pub);
2188 : : }
2189 : : }
2190 : :
782 akapila@postgresql.o 2191 : 282 : entry->publish_as_relid = publish_as_relid;
2192 : :
2193 : : /*
2194 : : * Initialize the tuple slot, map, and row filter. These are only used
2195 : : * when publishing inserts, updates, or deletes.
2196 : : */
2197 [ + + + - ]: 282 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2198 [ - + ]: 39 : entry->pubactions.pubdelete)
2199 : : {
2200 : : /* Initialize the tuple slot and map */
2201 : 243 : init_tuple_slot(data, relation, entry);
2202 : :
2203 : : /* Initialize the row filter */
2204 : 243 : pgoutput_row_filter_init(data, rel_publications, entry);
2205 : :
2206 : : /* Initialize the column list */
750 tomas.vondra@postgre 2207 : 243 : pgoutput_column_list_init(data, rel_publications, entry);
2208 : : }
2209 : :
2642 peter_e@gmx.net 2210 : 281 : list_free(pubids);
800 akapila@postgresql.o 2211 : 281 : list_free(schemaPubids);
782 2212 : 281 : list_free(rel_publications);
2213 : :
2642 peter_e@gmx.net 2214 : 281 : entry->replicate_valid = true;
2215 : : }
2216 : :
2217 : 186229 : return entry;
2218 : : }
2219 : :
2220 : : /*
2221 : : * Cleanup list of streamed transactions and update the schema_sent flag.
2222 : : *
2223 : : * When a streamed transaction commits or aborts, we need to remove the
2224 : : * toplevel XID from the schema cache. If the transaction aborted, the
2225 : : * subscriber will simply throw away the schema records we streamed, so
2226 : : * we don't need to do anything else.
2227 : : *
2228 : : * If the transaction is committed, the subscriber will update the relation
2229 : : * cache - so tweak the schema_sent flag accordingly.
2230 : : */
2231 : : static void
1319 akapila@postgresql.o 2232 : 70 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2233 : : {
2234 : : HASH_SEQ_STATUS hash_seq;
2235 : : RelationSyncEntry *entry;
2236 : :
2237 [ - + ]: 70 : Assert(RelationSyncCache != NULL);
2238 : :
2239 : 70 : hash_seq_init(&hash_seq, RelationSyncCache);
2240 [ + + ]: 143 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2241 : : {
2242 : : /*
2243 : : * We can set the schema_sent flag for an entry that has committed xid
2244 : : * in the list as that ensures that the subscriber would have the
2245 : : * corresponding schema and we don't need to send it unless there is
2246 : : * any invalidation for that relation.
2247 : : */
101 nathan@postgresql.or 2248 [ + + + - :GNC 169 : foreach_xid(streamed_txn, entry->streamed_txns)
+ + ]
2249 : : {
2250 [ + + ]: 75 : if (xid == streamed_txn)
2251 : : {
1319 akapila@postgresql.o 2252 [ + + ]:CBC 52 : if (is_commit)
2253 : 41 : entry->schema_sent = true;
2254 : :
2255 : 52 : entry->streamed_txns =
101 nathan@postgresql.or 2256 :GNC 52 : foreach_delete_current(entry->streamed_txns, streamed_txn);
1319 akapila@postgresql.o 2257 :CBC 52 : break;
2258 : : }
2259 : : }
2260 : : }
2261 : 70 : }
2262 : :
2263 : : /*
2264 : : * Relcache invalidation callback
2265 : : */
2266 : : static void
2642 peter_e@gmx.net 2267 : 3053 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2268 : : {
2269 : : RelationSyncEntry *entry;
2270 : :
2271 : : /*
2272 : : * We can get here if the plugin was used in SQL interface as the
2273 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2274 : : * no way to unregister the relcache invalidation callback.
2275 : : */
2276 [ + + ]: 3053 : if (RelationSyncCache == NULL)
2277 : 9 : return;
2278 : :
2279 : : /*
2280 : : * Nobody keeps pointers to entries in this hash table around outside
2281 : : * logical decoding callback calls - but invalidation events can come in
2282 : : * *during* a callback if we do any syscache access in the callback.
2283 : : * Because of that we must mark the cache entry as invalid but not damage
2284 : : * any of its substructure here. The next get_rel_sync_entry() call will
2285 : : * rebuild it all.
2286 : : */
800 akapila@postgresql.o 2287 [ + + ]: 3044 : if (OidIsValid(relid))
2288 : : {
2289 : : /*
2290 : : * Getting invalidations for relations that aren't in the table is
2291 : : * entirely normal. So we don't care if it's found or not.
2292 : : */
2293 : 3018 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2294 : : HASH_FIND, NULL);
2295 [ + + ]: 3018 : if (entry != NULL)
2296 : 458 : entry->replicate_valid = false;
2297 : : }
2298 : : else
2299 : : {
2300 : : /* Whole cache must be flushed. */
2301 : : HASH_SEQ_STATUS status;
2302 : :
2303 : 26 : hash_seq_init(&status, RelationSyncCache);
2304 [ + + ]: 56 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2305 : : {
2306 : 30 : entry->replicate_valid = false;
2307 : : }
2308 : : }
2309 : : }
2310 : :
2311 : : /*
2312 : : * Publication relation/schema map syscache invalidation callback
2313 : : *
2314 : : * Called for invalidations on pg_publication, pg_publication_rel,
2315 : : * pg_publication_namespace, and pg_namespace.
2316 : : */
2317 : : static void
2642 peter_e@gmx.net 2318 : 629 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2319 : : {
2320 : : HASH_SEQ_STATUS status;
2321 : : RelationSyncEntry *entry;
2322 : :
2323 : : /*
2324 : : * We can get here if the plugin was used in SQL interface as the
2325 : : * RelationSyncCache is destroyed when the decoding finishes, but there is
2326 : : * no way to unregister the invalidation callbacks.
2327 : : */
2328 [ + + ]: 629 : if (RelationSyncCache == NULL)
2329 : 34 : return;
2330 : :
2331 : : /*
2332 : : * We have no easy way to identify which cache entries this invalidation
2333 : : * event might have affected, so just mark them all invalid.
2334 : : */
2335 : 595 : hash_seq_init(&status, RelationSyncCache);
2336 [ + + ]: 1825 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2337 : : {
2338 : 1230 : entry->replicate_valid = false;
2339 : : }
2340 : : }
2341 : :
2342 : : /* Send Replication origin */
2343 : : static void
1005 akapila@postgresql.o 2344 : 986 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2345 : : XLogRecPtr origin_lsn, bool send_origin)
2346 : : {
2347 [ + + ]: 986 : if (send_origin)
2348 : : {
2349 : : char *origin;
2350 : :
2351 : : /*----------
2352 : : * XXX: which behaviour do we want here?
2353 : : *
2354 : : * Alternatives:
2355 : : * - don't send origin message if origin name not found
2356 : : * (that's what we do now)
2357 : : * - throw error - that will break replication, not good
2358 : : * - send some special "unknown" origin
2359 : : *----------
2360 : : */
2361 [ + - ]: 9 : if (replorigin_by_oid(origin_id, true, &origin))
2362 : : {
2363 : : /* Message boundary */
2364 : 9 : OutputPluginWrite(ctx, false);
2365 : 9 : OutputPluginPrepareWrite(ctx, true);
2366 : :
2367 : 9 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2368 : : }
2369 : : }
2370 : 986 : }
|