Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * test_decoding.c
4 : : * example logical decoding output plugin
5 : : *
6 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * contrib/test_decoding/test_decoding.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "catalog/pg_type.h"
16 : :
17 : : #include "replication/logical.h"
18 : : #include "replication/origin.h"
19 : :
20 : : #include "utils/builtins.h"
21 : : #include "utils/lsyscache.h"
22 : : #include "utils/memutils.h"
23 : : #include "utils/rel.h"
24 : :
3695 rhaas@postgresql.org 25 :CBC 100 : PG_MODULE_MAGIC;
26 : :
27 : : typedef struct
28 : : {
29 : : MemoryContext context;
30 : : bool include_xids;
31 : : bool include_timestamp;
32 : : bool skip_empty_xacts;
33 : : bool only_local;
34 : : } TestDecodingData;
35 : :
36 : : /*
37 : : * Maintain the per-transaction level variables to track whether the
38 : : * transaction and or streams have written any changes. In streaming mode the
39 : : * transaction can be decoded in streams so along with maintaining whether the
40 : : * transaction has written any changes, we also need to track whether the
41 : : * current stream has written any changes. This is required so that if user
42 : : * has requested to skip the empty transactions we can skip the empty streams
43 : : * even though the transaction has written some changes.
44 : : */
45 : : typedef struct
46 : : {
47 : : bool xact_wrote_changes;
48 : : bool stream_wrote_changes;
49 : : } TestDecodingTxnData;
50 : :
51 : : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
52 : : bool is_init);
53 : : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
54 : : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
55 : : ReorderBufferTXN *txn);
56 : : static void pg_output_begin(LogicalDecodingContext *ctx,
57 : : TestDecodingData *data,
58 : : ReorderBufferTXN *txn,
59 : : bool last_write);
60 : : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
61 : : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
62 : : static void pg_decode_change(LogicalDecodingContext *ctx,
63 : : ReorderBufferTXN *txn, Relation relation,
64 : : ReorderBufferChange *change);
65 : : static void pg_decode_truncate(LogicalDecodingContext *ctx,
66 : : ReorderBufferTXN *txn,
67 : : int nrelations, Relation relations[],
68 : : ReorderBufferChange *change);
69 : : static bool pg_decode_filter(LogicalDecodingContext *ctx,
70 : : RepOriginId origin_id);
71 : : static void pg_decode_message(LogicalDecodingContext *ctx,
72 : : ReorderBufferTXN *txn, XLogRecPtr lsn,
73 : : bool transactional, const char *prefix,
74 : : Size sz, const char *message);
75 : : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
76 : : TransactionId xid,
77 : : const char *gid);
78 : : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
79 : : ReorderBufferTXN *txn);
80 : : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
81 : : ReorderBufferTXN *txn,
82 : : XLogRecPtr prepare_lsn);
83 : : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
84 : : ReorderBufferTXN *txn,
85 : : XLogRecPtr commit_lsn);
86 : : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
87 : : ReorderBufferTXN *txn,
88 : : XLogRecPtr prepare_end_lsn,
89 : : TimestampTz prepare_time);
90 : : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
91 : : ReorderBufferTXN *txn);
92 : : static void pg_output_stream_start(LogicalDecodingContext *ctx,
93 : : TestDecodingData *data,
94 : : ReorderBufferTXN *txn,
95 : : bool last_write);
96 : : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
97 : : ReorderBufferTXN *txn);
98 : : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
99 : : ReorderBufferTXN *txn,
100 : : XLogRecPtr abort_lsn);
101 : : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
102 : : ReorderBufferTXN *txn,
103 : : XLogRecPtr prepare_lsn);
104 : : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
105 : : ReorderBufferTXN *txn,
106 : : XLogRecPtr commit_lsn);
107 : : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
108 : : ReorderBufferTXN *txn,
109 : : Relation relation,
110 : : ReorderBufferChange *change);
111 : : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
112 : : ReorderBufferTXN *txn, XLogRecPtr lsn,
113 : : bool transactional, const char *prefix,
114 : : Size sz, const char *message);
115 : : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
116 : : ReorderBufferTXN *txn,
117 : : int nrelations, Relation relations[],
118 : : ReorderBufferChange *change);
119 : :
120 : : void
121 : 100 : _PG_init(void)
122 : : {
123 : : /* other plugins can perform things here */
124 : 100 : }
125 : :
126 : : /* specify output plugin callbacks */
127 : : void
128 : 315 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
129 : : {
130 : 315 : cb->startup_cb = pg_decode_startup;
131 : 315 : cb->begin_cb = pg_decode_begin_txn;
132 : 315 : cb->change_cb = pg_decode_change;
2199 peter_e@gmx.net 133 : 315 : cb->truncate_cb = pg_decode_truncate;
3695 rhaas@postgresql.org 134 : 315 : cb->commit_cb = pg_decode_commit_txn;
3273 andres@anarazel.de 135 : 315 : cb->filter_by_origin_cb = pg_decode_filter;
3695 rhaas@postgresql.org 136 : 315 : cb->shutdown_cb = pg_decode_shutdown;
2930 simon@2ndQuadrant.co 137 : 315 : cb->message_cb = pg_decode_message;
1201 akapila@postgresql.o 138 : 315 : cb->filter_prepare_cb = pg_decode_filter_prepare;
139 : 315 : cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
140 : 315 : cb->prepare_cb = pg_decode_prepare_txn;
141 : 315 : cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
142 : 315 : cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
1356 143 : 315 : cb->stream_start_cb = pg_decode_stream_start;
144 : 315 : cb->stream_stop_cb = pg_decode_stream_stop;
145 : 315 : cb->stream_abort_cb = pg_decode_stream_abort;
1201 146 : 315 : cb->stream_prepare_cb = pg_decode_stream_prepare;
1356 147 : 315 : cb->stream_commit_cb = pg_decode_stream_commit;
148 : 315 : cb->stream_change_cb = pg_decode_stream_change;
149 : 315 : cb->stream_message_cb = pg_decode_stream_message;
150 : 315 : cb->stream_truncate_cb = pg_decode_stream_truncate;
3695 rhaas@postgresql.org 151 : 315 : }
152 : :
153 : :
154 : : /* initialize this plugin */
155 : : static void
156 : 315 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
157 : : bool is_init)
158 : : {
159 : : ListCell *option;
160 : : TestDecodingData *data;
1345 akapila@postgresql.o 161 : 315 : bool enable_streaming = false;
162 : :
3513 andres@anarazel.de 163 : 315 : data = palloc0(sizeof(TestDecodingData));
3695 rhaas@postgresql.org 164 : 315 : data->context = AllocSetContextCreate(ctx->context,
165 : : "text conversion context",
166 : : ALLOCSET_DEFAULT_SIZES);
167 : 315 : data->include_xids = true;
168 : 315 : data->include_timestamp = false;
3513 andres@anarazel.de 169 : 315 : data->skip_empty_xacts = false;
3273 170 : 315 : data->only_local = false;
171 : :
3695 rhaas@postgresql.org 172 : 315 : ctx->output_plugin_private = data;
173 : :
174 : 315 : opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
2216 peter_e@gmx.net 175 : 315 : opt->receive_rewrites = false;
176 : :
3695 rhaas@postgresql.org 177 [ + + + + : 653 : foreach(option, ctx->output_plugin_options)
+ + ]
178 : : {
179 : 341 : DefElem *elem = lfirst(option);
180 : :
181 [ + - - + ]: 341 : Assert(elem->arg == NULL || IsA(elem->arg, String));
182 : :
183 [ + + ]: 341 : if (strcmp(elem->defname, "include-xids") == 0)
184 : : {
185 : : /* if option does not provide a value, it means its value is true */
186 [ - + ]: 162 : if (elem->arg == NULL)
3695 rhaas@postgresql.org 187 :UBC 0 : data->include_xids = true;
3695 rhaas@postgresql.org 188 [ + + ]:CBC 162 : else if (!parse_bool(strVal(elem->arg), &data->include_xids))
189 [ + - ]: 2 : ereport(ERROR,
190 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
192 : : strVal(elem->arg), elem->defname)));
193 : : }
194 [ + + ]: 179 : else if (strcmp(elem->defname, "include-timestamp") == 0)
195 : : {
196 [ - + ]: 1 : if (elem->arg == NULL)
3695 rhaas@postgresql.org 197 :UBC 0 : data->include_timestamp = true;
3695 rhaas@postgresql.org 198 [ - + ]:CBC 1 : else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
3695 rhaas@postgresql.org 199 [ # # ]:UBC 0 : ereport(ERROR,
200 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
201 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
202 : : strVal(elem->arg), elem->defname)));
203 : : }
3695 rhaas@postgresql.org 204 [ + + ]:CBC 178 : else if (strcmp(elem->defname, "force-binary") == 0)
205 : : {
206 : : bool force_binary;
207 : :
208 [ - + ]: 6 : if (elem->arg == NULL)
3695 rhaas@postgresql.org 209 :UBC 0 : continue;
3695 rhaas@postgresql.org 210 [ - + ]:CBC 6 : else if (!parse_bool(strVal(elem->arg), &force_binary))
3695 rhaas@postgresql.org 211 [ # # ]:UBC 0 : ereport(ERROR,
212 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
213 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
214 : : strVal(elem->arg), elem->defname)));
215 : :
3695 rhaas@postgresql.org 216 [ + + ]:CBC 6 : if (force_binary)
217 : 2 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
218 : : }
3513 andres@anarazel.de 219 [ + + ]: 172 : else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
220 : : {
221 : :
222 [ - + ]: 159 : if (elem->arg == NULL)
3513 andres@anarazel.de 223 :UBC 0 : data->skip_empty_xacts = true;
3513 andres@anarazel.de 224 [ - + ]:CBC 159 : else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
3513 andres@anarazel.de 225 [ # # ]:UBC 0 : ereport(ERROR,
226 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
227 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
228 : : strVal(elem->arg), elem->defname)));
229 : : }
3273 andres@anarazel.de 230 [ + + ]:CBC 13 : else if (strcmp(elem->defname, "only-local") == 0)
231 : : {
232 : :
233 [ - + ]: 3 : if (elem->arg == NULL)
3273 andres@anarazel.de 234 :UBC 0 : data->only_local = true;
3273 andres@anarazel.de 235 [ - + ]:CBC 3 : else if (!parse_bool(strVal(elem->arg), &data->only_local))
3273 andres@anarazel.de 236 [ # # ]:UBC 0 : ereport(ERROR,
237 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
238 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
239 : : strVal(elem->arg), elem->defname)));
240 : : }
2216 peter_e@gmx.net 241 [ + + ]:CBC 10 : else if (strcmp(elem->defname, "include-rewrites") == 0)
242 : : {
243 : :
244 [ - + ]: 1 : if (elem->arg == NULL)
2216 peter_e@gmx.net 245 :UBC 0 : continue;
2216 peter_e@gmx.net 246 [ - + ]:CBC 1 : else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
2216 peter_e@gmx.net 247 [ # # ]:UBC 0 : ereport(ERROR,
248 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
249 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
250 : : strVal(elem->arg), elem->defname)));
251 : : }
1345 akapila@postgresql.o 252 [ + + ]:CBC 9 : else if (strcmp(elem->defname, "stream-changes") == 0)
253 : : {
254 [ - + ]: 8 : if (elem->arg == NULL)
1345 akapila@postgresql.o 255 :UBC 0 : continue;
1345 akapila@postgresql.o 256 [ - + ]:CBC 8 : else if (!parse_bool(strVal(elem->arg), &enable_streaming))
1345 akapila@postgresql.o 257 [ # # ]:UBC 0 : ereport(ERROR,
258 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
259 : : errmsg("could not parse value \"%s\" for parameter \"%s\"",
260 : : strVal(elem->arg), elem->defname)));
261 : : }
262 : : else
263 : : {
3695 rhaas@postgresql.org 264 [ + - + - ]:CBC 1 : ereport(ERROR,
265 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
266 : : errmsg("option \"%s\" = \"%s\" is unknown",
267 : : elem->defname,
268 : : elem->arg ? strVal(elem->arg) : "(null)")));
269 : : }
270 : : }
271 : :
1345 akapila@postgresql.o 272 : 312 : ctx->streaming &= enable_streaming;
3695 rhaas@postgresql.org 273 : 312 : }
274 : :
275 : : /* cleanup this plugin's resources */
276 : : static void
277 : 302 : pg_decode_shutdown(LogicalDecodingContext *ctx)
278 : : {
279 : 302 : TestDecodingData *data = ctx->output_plugin_private;
280 : :
281 : : /* cleanup our own resources via memory context reset */
282 : 302 : MemoryContextDelete(data->context);
283 : 302 : }
284 : :
285 : : /* BEGIN callback */
286 : : static void
287 : 432 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
288 : : {
289 : 432 : TestDecodingData *data = ctx->output_plugin_private;
290 : : TestDecodingTxnData *txndata =
331 tgl@sss.pgh.pa.us 291 : 432 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
292 : :
1244 akapila@postgresql.o 293 : 432 : txndata->xact_wrote_changes = false;
294 : 432 : txn->output_plugin_private = txndata;
295 : :
296 : : /*
297 : : * If asked to skip empty transactions, we'll emit BEGIN at the point
298 : : * where the first operation is received for this transaction.
299 : : */
3513 andres@anarazel.de 300 [ + + ]: 432 : if (data->skip_empty_xacts)
301 : 388 : return;
302 : :
303 : 44 : pg_output_begin(ctx, data, txn, true);
304 : : }
305 : :
306 : : static void
307 : 267 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
308 : : {
309 : 267 : OutputPluginPrepareWrite(ctx, last_write);
3695 rhaas@postgresql.org 310 [ + + ]: 267 : if (data->include_xids)
311 : 39 : appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
312 : : else
313 : 228 : appendStringInfoString(ctx->out, "BEGIN");
3513 andres@anarazel.de 314 : 267 : OutputPluginWrite(ctx, last_write);
3695 rhaas@postgresql.org 315 : 267 : }
316 : :
317 : : /* COMMIT callback */
318 : : static void
319 : 432 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
320 : : XLogRecPtr commit_lsn)
321 : : {
322 : 432 : TestDecodingData *data = ctx->output_plugin_private;
1244 akapila@postgresql.o 323 : 432 : TestDecodingTxnData *txndata = txn->output_plugin_private;
324 : 432 : bool xact_wrote_changes = txndata->xact_wrote_changes;
325 : :
326 : 432 : pfree(txndata);
327 : 432 : txn->output_plugin_private = NULL;
328 : :
329 [ + + + + ]: 432 : if (data->skip_empty_xacts && !xact_wrote_changes)
3513 andres@anarazel.de 330 : 171 : return;
331 : :
3695 rhaas@postgresql.org 332 : 261 : OutputPluginPrepareWrite(ctx, true);
333 [ + + ]: 261 : if (data->include_xids)
334 : 39 : appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
335 : : else
336 : 222 : appendStringInfoString(ctx->out, "COMMIT");
337 : :
338 [ + + ]: 261 : if (data->include_timestamp)
339 : 1 : appendStringInfo(ctx->out, " (at %s)",
340 : : timestamptz_to_str(txn->xact_time.commit_time));
341 : :
342 : 261 : OutputPluginWrite(ctx, true);
343 : : }
344 : :
345 : : /* BEGIN PREPARE callback */
346 : : static void
1201 akapila@postgresql.o 347 : 6 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
348 : : {
349 : 6 : TestDecodingData *data = ctx->output_plugin_private;
350 : : TestDecodingTxnData *txndata =
331 tgl@sss.pgh.pa.us 351 : 6 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
352 : :
1201 akapila@postgresql.o 353 : 6 : txndata->xact_wrote_changes = false;
354 : 6 : txn->output_plugin_private = txndata;
355 : :
356 : : /*
357 : : * If asked to skip empty transactions, we'll emit BEGIN at the point
358 : : * where the first operation is received for this transaction.
359 : : */
360 [ + - ]: 6 : if (data->skip_empty_xacts)
361 : 6 : return;
362 : :
1201 akapila@postgresql.o 363 :UBC 0 : pg_output_begin(ctx, data, txn, true);
364 : : }
365 : :
366 : : /* PREPARE callback */
367 : : static void
1201 akapila@postgresql.o 368 :CBC 6 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
369 : : XLogRecPtr prepare_lsn)
370 : : {
371 : 6 : TestDecodingData *data = ctx->output_plugin_private;
372 : 6 : TestDecodingTxnData *txndata = txn->output_plugin_private;
373 : :
374 : : /*
375 : : * If asked to skip empty transactions, we'll emit PREPARE at the point
376 : : * where the first operation is received for this transaction.
377 : : */
378 [ + - - + ]: 6 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
1201 akapila@postgresql.o 379 :UBC 0 : return;
380 : :
1201 akapila@postgresql.o 381 :CBC 6 : OutputPluginPrepareWrite(ctx, true);
382 : :
383 : 6 : appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
384 : 6 : quote_literal_cstr(txn->gid));
385 : :
386 [ - + ]: 6 : if (data->include_xids)
1201 akapila@postgresql.o 387 :UBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
388 : :
1201 akapila@postgresql.o 389 [ - + ]:CBC 6 : if (data->include_timestamp)
1201 akapila@postgresql.o 390 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
391 : : timestamptz_to_str(txn->xact_time.prepare_time));
392 : :
1201 akapila@postgresql.o 393 :CBC 6 : OutputPluginWrite(ctx, true);
394 : : }
395 : :
396 : : /* COMMIT PREPARED callback */
397 : : static void
398 : 6 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
399 : : XLogRecPtr commit_lsn)
400 : : {
401 : 6 : TestDecodingData *data = ctx->output_plugin_private;
402 : :
403 : 6 : OutputPluginPrepareWrite(ctx, true);
404 : :
405 : 6 : appendStringInfo(ctx->out, "COMMIT PREPARED %s",
406 : 6 : quote_literal_cstr(txn->gid));
407 : :
408 [ - + ]: 6 : if (data->include_xids)
1201 akapila@postgresql.o 409 :UBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
410 : :
1201 akapila@postgresql.o 411 [ - + ]:CBC 6 : if (data->include_timestamp)
1201 akapila@postgresql.o 412 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
413 : : timestamptz_to_str(txn->xact_time.commit_time));
414 : :
1201 akapila@postgresql.o 415 :CBC 6 : OutputPluginWrite(ctx, true);
416 : 6 : }
417 : :
418 : : /* ROLLBACK PREPARED callback */
419 : : static void
420 : 1 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
421 : : ReorderBufferTXN *txn,
422 : : XLogRecPtr prepare_end_lsn,
423 : : TimestampTz prepare_time)
424 : : {
425 : 1 : TestDecodingData *data = ctx->output_plugin_private;
426 : :
427 : 1 : OutputPluginPrepareWrite(ctx, true);
428 : :
429 : 1 : appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
430 : 1 : quote_literal_cstr(txn->gid));
431 : :
432 [ - + ]: 1 : if (data->include_xids)
1201 akapila@postgresql.o 433 :UBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
434 : :
1201 akapila@postgresql.o 435 [ - + ]:CBC 1 : if (data->include_timestamp)
1201 akapila@postgresql.o 436 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
437 : : timestamptz_to_str(txn->xact_time.commit_time));
438 : :
1201 akapila@postgresql.o 439 :CBC 1 : OutputPluginWrite(ctx, true);
440 : 1 : }
441 : :
442 : : /*
443 : : * Filter out two-phase transactions.
444 : : *
445 : : * Each plugin can implement its own filtering logic. Here we demonstrate a
446 : : * simple logic by checking the GID. If the GID contains the "_nodecode"
447 : : * substring, then we filter it out.
448 : : */
449 : : static bool
1111 450 : 115 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
451 : : const char *gid)
452 : : {
1201 453 [ + + ]: 115 : if (strstr(gid, "_nodecode") != NULL)
454 : 8 : return true;
455 : :
456 : 107 : return false;
457 : : }
458 : :
459 : : static bool
3273 andres@anarazel.de 460 : 1195092 : pg_decode_filter(LogicalDecodingContext *ctx,
461 : : RepOriginId origin_id)
462 : : {
463 : 1195092 : TestDecodingData *data = ctx->output_plugin_private;
464 : :
465 [ + + + + ]: 1195092 : if (data->only_local && origin_id != InvalidRepOriginId)
466 : 9 : return true;
467 : 1195083 : return false;
468 : : }
469 : :
470 : : /*
471 : : * Print literal `outputstr' already represented as string of type `typid'
472 : : * into stringbuf `s'.
473 : : *
474 : : * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
475 : : * if standard_conforming_strings were enabled.
476 : : */
477 : : static void
3695 rhaas@postgresql.org 478 : 176044 : print_literal(StringInfo s, Oid typid, char *outputstr)
479 : : {
480 : : const char *valptr;
481 : :
482 [ + - - + ]: 176044 : switch (typid)
483 : : {
484 : 60304 : case INT2OID:
485 : : case INT4OID:
486 : : case INT8OID:
487 : : case OIDOID:
488 : : case FLOAT4OID:
489 : : case FLOAT8OID:
490 : : case NUMERICOID:
491 : : /* NB: We don't care about Inf, NaN et al. */
492 : 60304 : appendStringInfoString(s, outputstr);
493 : 60304 : break;
494 : :
3695 rhaas@postgresql.org 495 :UBC 0 : case BITOID:
496 : : case VARBITOID:
497 : 0 : appendStringInfo(s, "B'%s'", outputstr);
498 : 0 : break;
499 : :
500 : 0 : case BOOLOID:
501 [ # # ]: 0 : if (strcmp(outputstr, "t") == 0)
502 : 0 : appendStringInfoString(s, "true");
503 : : else
504 : 0 : appendStringInfoString(s, "false");
505 : 0 : break;
506 : :
3695 rhaas@postgresql.org 507 :CBC 115740 : default:
508 : 115740 : appendStringInfoChar(s, '\'');
509 [ + + ]: 5427304 : for (valptr = outputstr; *valptr; valptr++)
510 : : {
511 : 5311564 : char ch = *valptr;
512 : :
513 [ + + ]: 5311564 : if (SQL_STR_DOUBLE(ch, false))
514 : 64 : appendStringInfoChar(s, ch);
515 : 5311564 : appendStringInfoChar(s, ch);
516 : : }
517 : 115740 : appendStringInfoChar(s, '\'');
518 : 115740 : break;
519 : : }
520 : 176044 : }
521 : :
522 : : /* print the tuple 'tuple' into the StringInfo s */
523 : : static void
524 : 145611 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
525 : : {
526 : : int natt;
527 : :
528 : : /* print all columns individually */
529 [ + + ]: 347412 : for (natt = 0; natt < tupdesc->natts; natt++)
530 : : {
531 : : Form_pg_attribute attr; /* the attribute itself */
532 : : Oid typid; /* type of current attribute */
533 : : Oid typoutput; /* output function */
534 : : bool typisvarlena;
535 : : Datum origval; /* possibly toasted Datum */
536 : : bool isnull; /* column is null? */
537 : :
2429 andres@anarazel.de 538 : 201801 : attr = TupleDescAttr(tupdesc, natt);
539 : :
540 : : /*
541 : : * don't print dropped columns, we can't be sure everything is
542 : : * available for them
543 : : */
3695 rhaas@postgresql.org 544 [ + + ]: 201801 : if (attr->attisdropped)
545 : 5130 : continue;
546 : :
547 : : /*
548 : : * Don't print system columns, oid will already have been printed if
549 : : * present.
550 : : */
551 [ - + ]: 201729 : if (attr->attnum < 0)
3695 rhaas@postgresql.org 552 :UBC 0 : continue;
553 : :
3695 rhaas@postgresql.org 554 :CBC 201729 : typid = attr->atttypid;
555 : :
556 : : /* get Datum from tuple */
3214 andres@anarazel.de 557 : 201729 : origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
558 : :
3695 rhaas@postgresql.org 559 [ + + + + ]: 201729 : if (isnull && skip_nulls)
560 : 5058 : continue;
561 : :
562 : : /* print attribute name */
563 : 196671 : appendStringInfoChar(s, ' ');
564 : 196671 : appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
565 : :
566 : : /* print attribute type */
567 : 196671 : appendStringInfoChar(s, '[');
568 : 196671 : appendStringInfoString(s, format_type_be(typid));
569 : 196671 : appendStringInfoChar(s, ']');
570 : :
571 : : /* query output function */
572 : 196671 : getTypeOutputInfo(typid,
573 : : &typoutput, &typisvarlena);
574 : :
575 : : /* print separator */
576 : 196671 : appendStringInfoChar(s, ':');
577 : :
578 : : /* print data */
579 [ + + ]: 196671 : if (isnull)
580 : 20615 : appendStringInfoString(s, "null");
581 [ + + + + : 176056 : else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
+ + ]
582 : 12 : appendStringInfoString(s, "unchanged-toast-datum");
583 [ + + ]: 176044 : else if (!typisvarlena)
584 : 60308 : print_literal(s, typid,
585 : : OidOutputFunctionCall(typoutput, origval));
586 : : else
587 : : {
588 : : Datum val; /* definitely detoasted Datum */
589 : :
590 : 115736 : val = PointerGetDatum(PG_DETOAST_DATUM(origval));
591 : 115736 : print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
592 : : }
593 : : }
594 : 145611 : }
595 : :
596 : : /*
597 : : * callback for individual changed tuples
598 : : */
599 : : static void
600 : 150600 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
601 : : Relation relation, ReorderBufferChange *change)
602 : : {
603 : : TestDecodingData *data;
604 : : TestDecodingTxnData *txndata;
605 : : Form_pg_class class_form;
606 : : TupleDesc tupdesc;
607 : : MemoryContext old;
608 : :
609 : 150600 : data = ctx->output_plugin_private;
1244 akapila@postgresql.o 610 : 150600 : txndata = txn->output_plugin_private;
611 : :
612 : : /* output BEGIN if we haven't yet */
613 [ + + + + ]: 150600 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
614 : : {
3513 andres@anarazel.de 615 : 214 : pg_output_begin(ctx, data, txn, false);
616 : : }
1244 akapila@postgresql.o 617 : 150600 : txndata->xact_wrote_changes = true;
618 : :
3695 rhaas@postgresql.org 619 : 150600 : class_form = RelationGetForm(relation);
620 : 150600 : tupdesc = RelationGetDescr(relation);
621 : :
622 : : /* Avoid leaking memory by using and resetting our own context */
623 : 150600 : old = MemoryContextSwitchTo(data->context);
624 : :
625 : 150600 : OutputPluginPrepareWrite(ctx, true);
626 : :
627 : 150600 : appendStringInfoString(ctx->out, "table ");
628 : 150600 : appendStringInfoString(ctx->out,
1536 alvherre@alvh.no-ip. 629 : 150600 : quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
2216 peter_e@gmx.net 630 [ + + ]: 150600 : class_form->relrewrite ?
631 : 1 : get_rel_name(class_form->relrewrite) :
632 : : NameStr(class_form->relname)));
3261 633 : 150600 : appendStringInfoChar(ctx->out, ':');
634 : :
3695 rhaas@postgresql.org 635 [ + + + - ]: 150600 : switch (change->action)
636 : : {
637 : 133035 : case REORDER_BUFFER_CHANGE_INSERT:
638 : 133035 : appendStringInfoString(ctx->out, " INSERT:");
3691 tgl@sss.pgh.pa.us 639 [ - + ]: 133035 : if (change->data.tp.newtuple == NULL)
3695 rhaas@postgresql.org 640 :UBC 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
641 : : else
3695 rhaas@postgresql.org 642 :CBC 133035 : tuple_to_stringinfo(ctx->out, tupdesc,
643 : : change->data.tp.newtuple,
644 : : false);
645 : 133035 : break;
646 : 7543 : case REORDER_BUFFER_CHANGE_UPDATE:
647 : 7543 : appendStringInfoString(ctx->out, " UPDATE:");
3691 tgl@sss.pgh.pa.us 648 [ + + ]: 7543 : if (change->data.tp.oldtuple != NULL)
649 : : {
3695 rhaas@postgresql.org 650 : 18 : appendStringInfoString(ctx->out, " old-key:");
651 : 18 : tuple_to_stringinfo(ctx->out, tupdesc,
652 : : change->data.tp.oldtuple,
653 : : true);
654 : 18 : appendStringInfoString(ctx->out, " new-tuple:");
655 : : }
656 : :
3691 tgl@sss.pgh.pa.us 657 [ - + ]: 7543 : if (change->data.tp.newtuple == NULL)
3695 rhaas@postgresql.org 658 :UBC 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
659 : : else
3695 rhaas@postgresql.org 660 :CBC 7543 : tuple_to_stringinfo(ctx->out, tupdesc,
661 : : change->data.tp.newtuple,
662 : : false);
663 : 7543 : break;
664 : 10022 : case REORDER_BUFFER_CHANGE_DELETE:
665 : 10022 : appendStringInfoString(ctx->out, " DELETE:");
666 : :
667 : : /* if there was no PK, we only know that a delete happened */
3691 tgl@sss.pgh.pa.us 668 [ + + ]: 10022 : if (change->data.tp.oldtuple == NULL)
3695 rhaas@postgresql.org 669 : 5007 : appendStringInfoString(ctx->out, " (no-tuple-data)");
670 : : /* In DELETE, only the replica identity is present; display that */
671 : : else
672 : 5015 : tuple_to_stringinfo(ctx->out, tupdesc,
673 : : change->data.tp.oldtuple,
674 : : true);
675 : 10022 : break;
3691 tgl@sss.pgh.pa.us 676 :UBC 0 : default:
677 : 0 : Assert(false);
678 : : }
679 : :
3695 rhaas@postgresql.org 680 :CBC 150600 : MemoryContextSwitchTo(old);
681 : 150600 : MemoryContextReset(data->context);
682 : :
683 : 150600 : OutputPluginWrite(ctx, true);
684 : 150600 : }
685 : :
686 : : static void
2199 peter_e@gmx.net 687 : 7 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
688 : : int nrelations, Relation relations[], ReorderBufferChange *change)
689 : : {
690 : : TestDecodingData *data;
691 : : TestDecodingTxnData *txndata;
692 : : MemoryContext old;
693 : : int i;
694 : :
695 : 7 : data = ctx->output_plugin_private;
1244 akapila@postgresql.o 696 : 7 : txndata = txn->output_plugin_private;
697 : :
698 : : /* output BEGIN if we haven't yet */
699 [ + + + - ]: 7 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
700 : : {
2199 peter_e@gmx.net 701 : 6 : pg_output_begin(ctx, data, txn, false);
702 : : }
1244 akapila@postgresql.o 703 : 7 : txndata->xact_wrote_changes = true;
704 : :
705 : : /* Avoid leaking memory by using and resetting our own context */
2199 peter_e@gmx.net 706 : 7 : old = MemoryContextSwitchTo(data->context);
707 : :
708 : 7 : OutputPluginPrepareWrite(ctx, true);
709 : :
710 : 7 : appendStringInfoString(ctx->out, "table ");
711 : :
712 [ + + ]: 15 : for (i = 0; i < nrelations; i++)
713 : : {
714 [ + + ]: 8 : if (i > 0)
715 : 1 : appendStringInfoString(ctx->out, ", ");
716 : :
717 : 8 : appendStringInfoString(ctx->out,
718 : 8 : quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
719 : 8 : NameStr(relations[i]->rd_rel->relname)));
720 : : }
721 : :
722 : 7 : appendStringInfoString(ctx->out, ": TRUNCATE:");
723 : :
724 [ + + ]: 7 : if (change->data.truncate.restart_seqs
725 [ - + ]: 6 : || change->data.truncate.cascade)
726 : : {
727 [ + - ]: 1 : if (change->data.truncate.restart_seqs)
1746 drowley@postgresql.o 728 : 1 : appendStringInfoString(ctx->out, " restart_seqs");
2199 peter_e@gmx.net 729 [ + - ]: 1 : if (change->data.truncate.cascade)
1746 drowley@postgresql.o 730 : 1 : appendStringInfoString(ctx->out, " cascade");
731 : : }
732 : : else
2199 peter_e@gmx.net 733 : 6 : appendStringInfoString(ctx->out, " (no-flags)");
734 : :
735 : 7 : MemoryContextSwitchTo(old);
736 : 7 : MemoryContextReset(data->context);
737 : :
738 : 7 : OutputPluginWrite(ctx, true);
739 : 7 : }
740 : :
741 : : static void
2930 simon@2ndQuadrant.co 742 : 9 : pg_decode_message(LogicalDecodingContext *ctx,
743 : : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
744 : : const char *prefix, Size sz, const char *message)
745 : : {
278 akapila@postgresql.o 746 :GNC 9 : TestDecodingData *data = ctx->output_plugin_private;
747 : : TestDecodingTxnData *txndata;
748 : :
749 [ + + ]: 9 : txndata = transactional ? txn->output_plugin_private : NULL;
750 : :
751 : : /* output BEGIN if we haven't yet for transactional messages */
752 [ + + + - : 9 : if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ + ]
753 : 3 : pg_output_begin(ctx, data, txn, false);
754 : :
755 [ + + ]: 9 : if (transactional)
756 : 5 : txndata->xact_wrote_changes = true;
757 : :
2930 simon@2ndQuadrant.co 758 :CBC 9 : OutputPluginPrepareWrite(ctx, true);
759 : 9 : appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
760 : : transactional, prefix, sz);
761 : 9 : appendBinaryStringInfo(ctx->out, message, sz);
762 : 9 : OutputPluginWrite(ctx, true);
763 : 9 : }
764 : :
765 : : static void
1356 akapila@postgresql.o 766 : 11 : pg_decode_stream_start(LogicalDecodingContext *ctx,
767 : : ReorderBufferTXN *txn)
768 : : {
769 : 11 : TestDecodingData *data = ctx->output_plugin_private;
1244 770 : 11 : TestDecodingTxnData *txndata = txn->output_plugin_private;
771 : :
772 : : /*
773 : : * Allocate the txn plugin data for the first stream in the transaction.
774 : : */
775 [ + + ]: 11 : if (txndata == NULL)
776 : : {
777 : : txndata =
778 : 7 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
779 : 7 : txndata->xact_wrote_changes = false;
780 : 7 : txn->output_plugin_private = txndata;
781 : : }
782 : :
783 : 11 : txndata->stream_wrote_changes = false;
1311 784 [ + - ]: 11 : if (data->skip_empty_xacts)
785 : 11 : return;
1311 akapila@postgresql.o 786 :UBC 0 : pg_output_stream_start(ctx, data, txn, true);
787 : : }
788 : :
789 : : static void
1311 akapila@postgresql.o 790 :CBC 9 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
791 : : {
792 : 9 : OutputPluginPrepareWrite(ctx, last_write);
1356 793 [ - + ]: 9 : if (data->include_xids)
1356 akapila@postgresql.o 794 :UBC 0 : appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
795 : : else
1277 drowley@postgresql.o 796 :CBC 9 : appendStringInfoString(ctx->out, "opening a streamed block for transaction");
1311 akapila@postgresql.o 797 : 9 : OutputPluginWrite(ctx, last_write);
1356 798 : 9 : }
799 : :
800 : : static void
801 : 11 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
802 : : ReorderBufferTXN *txn)
803 : : {
804 : 11 : TestDecodingData *data = ctx->output_plugin_private;
1244 805 : 11 : TestDecodingTxnData *txndata = txn->output_plugin_private;
806 : :
807 [ + - + + ]: 11 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
1311 808 : 2 : return;
809 : :
1356 810 : 9 : OutputPluginPrepareWrite(ctx, true);
811 [ - + ]: 9 : if (data->include_xids)
1356 akapila@postgresql.o 812 :UBC 0 : appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
813 : : else
1277 drowley@postgresql.o 814 :CBC 9 : appendStringInfoString(ctx->out, "closing a streamed block for transaction");
1356 akapila@postgresql.o 815 : 9 : OutputPluginWrite(ctx, true);
816 : : }
817 : :
818 : : static void
819 : 3 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
820 : : ReorderBufferTXN *txn,
821 : : XLogRecPtr abort_lsn)
822 : : {
823 : 3 : TestDecodingData *data = ctx->output_plugin_private;
824 : :
825 : : /*
826 : : * stream abort can be sent for an individual subtransaction but we
827 : : * maintain the output_plugin_private only under the toptxn so if this is
828 : : * not the toptxn then fetch the toptxn.
829 : : */
394 830 [ + - ]: 3 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
1244 831 : 3 : TestDecodingTxnData *txndata = toptxn->output_plugin_private;
832 : 3 : bool xact_wrote_changes = txndata->xact_wrote_changes;
833 : :
394 834 [ - + ]: 3 : if (rbtxn_is_toptxn(txn))
835 : : {
1244 akapila@postgresql.o 836 [ # # ]:UBC 0 : Assert(txn->output_plugin_private != NULL);
837 : 0 : pfree(txndata);
838 : 0 : txn->output_plugin_private = NULL;
839 : : }
840 : :
1244 akapila@postgresql.o 841 [ + - - + ]:CBC 3 : if (data->skip_empty_xacts && !xact_wrote_changes)
1311 akapila@postgresql.o 842 :LBC (3) : return;
843 : :
1356 akapila@postgresql.o 844 :GBC 3 : OutputPluginPrepareWrite(ctx, true);
845 [ - + ]: 3 : if (data->include_xids)
1356 akapila@postgresql.o 846 :UBC 0 : appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
847 : : else
1277 drowley@postgresql.o 848 :GBC 3 : appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
1356 akapila@postgresql.o 849 : 3 : OutputPluginWrite(ctx, true);
850 : : }
851 : :
852 : : static void
1201 akapila@postgresql.o 853 :CBC 1 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
854 : : ReorderBufferTXN *txn,
855 : : XLogRecPtr prepare_lsn)
856 : : {
857 : 1 : TestDecodingData *data = ctx->output_plugin_private;
858 : 1 : TestDecodingTxnData *txndata = txn->output_plugin_private;
859 : :
860 [ + - - + ]: 1 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
1201 akapila@postgresql.o 861 :UBC 0 : return;
862 : :
1201 akapila@postgresql.o 863 :CBC 1 : OutputPluginPrepareWrite(ctx, true);
864 : :
865 [ - + ]: 1 : if (data->include_xids)
1201 akapila@postgresql.o 866 :UBC 0 : appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
867 : 0 : quote_literal_cstr(txn->gid), txn->xid);
868 : : else
1201 akapila@postgresql.o 869 :CBC 1 : appendStringInfo(ctx->out, "preparing streamed transaction %s",
870 : 1 : quote_literal_cstr(txn->gid));
871 : :
872 [ - + ]: 1 : if (data->include_timestamp)
1201 akapila@postgresql.o 873 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
874 : : timestamptz_to_str(txn->xact_time.prepare_time));
875 : :
1201 akapila@postgresql.o 876 :CBC 1 : OutputPluginWrite(ctx, true);
877 : : }
878 : :
879 : : static void
1356 880 : 4 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
881 : : ReorderBufferTXN *txn,
882 : : XLogRecPtr commit_lsn)
883 : : {
884 : 4 : TestDecodingData *data = ctx->output_plugin_private;
1244 885 : 4 : TestDecodingTxnData *txndata = txn->output_plugin_private;
886 : 4 : bool xact_wrote_changes = txndata->xact_wrote_changes;
887 : :
888 : 4 : pfree(txndata);
889 : 4 : txn->output_plugin_private = NULL;
890 : :
891 [ + - - + ]: 4 : if (data->skip_empty_xacts && !xact_wrote_changes)
1311 akapila@postgresql.o 892 :UBC 0 : return;
893 : :
1356 akapila@postgresql.o 894 :CBC 4 : OutputPluginPrepareWrite(ctx, true);
895 : :
896 [ - + ]: 4 : if (data->include_xids)
1356 akapila@postgresql.o 897 :UBC 0 : appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
898 : : else
1277 drowley@postgresql.o 899 :CBC 4 : appendStringInfoString(ctx->out, "committing streamed transaction");
900 : :
1356 akapila@postgresql.o 901 [ - + ]: 4 : if (data->include_timestamp)
1356 akapila@postgresql.o 902 :UBC 0 : appendStringInfo(ctx->out, " (at %s)",
903 : : timestamptz_to_str(txn->xact_time.commit_time));
904 : :
1356 akapila@postgresql.o 905 :CBC 4 : OutputPluginWrite(ctx, true);
906 : : }
907 : :
908 : : /*
909 : : * In streaming mode, we don't display the changes as the transaction can abort
910 : : * at a later point in time. We don't want users to see the changes until the
911 : : * transaction is committed.
912 : : */
913 : : static void
914 : 63 : pg_decode_stream_change(LogicalDecodingContext *ctx,
915 : : ReorderBufferTXN *txn,
916 : : Relation relation,
917 : : ReorderBufferChange *change)
918 : : {
919 : 63 : TestDecodingData *data = ctx->output_plugin_private;
1244 920 : 63 : TestDecodingTxnData *txndata = txn->output_plugin_private;
921 : :
922 : : /* output stream start if we haven't yet */
923 [ + - + + ]: 63 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
924 : : {
1311 925 : 6 : pg_output_stream_start(ctx, data, txn, false);
926 : : }
1244 927 : 63 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
928 : :
1356 929 : 63 : OutputPluginPrepareWrite(ctx, true);
930 [ - + ]: 63 : if (data->include_xids)
1356 akapila@postgresql.o 931 :UBC 0 : appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
932 : : else
1277 drowley@postgresql.o 933 :CBC 63 : appendStringInfoString(ctx->out, "streaming change for transaction");
1356 akapila@postgresql.o 934 : 63 : OutputPluginWrite(ctx, true);
935 : 63 : }
936 : :
937 : : /*
938 : : * In streaming mode, we don't display the contents for transactional messages
939 : : * as the transaction can abort at a later point in time. We don't want users to
940 : : * see the message contents until the transaction is committed.
941 : : */
942 : : static void
943 : 3 : pg_decode_stream_message(LogicalDecodingContext *ctx,
944 : : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
945 : : const char *prefix, Size sz, const char *message)
946 : : {
947 : : /* Output stream start if we haven't yet for transactional messages. */
167 akapila@postgresql.o 948 [ + - ]:GNC 3 : if (transactional)
949 : : {
950 : 3 : TestDecodingData *data = ctx->output_plugin_private;
951 : 3 : TestDecodingTxnData *txndata = txn->output_plugin_private;
952 : :
953 [ + - + - ]: 3 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
954 : : {
955 : 3 : pg_output_stream_start(ctx, data, txn, false);
956 : : }
957 : 3 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
958 : : }
959 : :
1356 akapila@postgresql.o 960 :CBC 3 : OutputPluginPrepareWrite(ctx, true);
961 : :
962 [ + - ]: 3 : if (transactional)
963 : : {
964 : 3 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
965 : : transactional, prefix, sz);
966 : : }
967 : : else
968 : : {
1356 akapila@postgresql.o 969 :UBC 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
970 : : transactional, prefix, sz);
971 : 0 : appendBinaryStringInfo(ctx->out, message, sz);
972 : : }
973 : :
1356 akapila@postgresql.o 974 :CBC 3 : OutputPluginWrite(ctx, true);
975 : 3 : }
976 : :
977 : : /*
978 : : * In streaming mode, we don't display the detailed information of Truncate.
979 : : * See pg_decode_stream_change.
980 : : */
981 : : static void
1356 akapila@postgresql.o 982 :UBC 0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
983 : : int nrelations, Relation relations[],
984 : : ReorderBufferChange *change)
985 : : {
986 : 0 : TestDecodingData *data = ctx->output_plugin_private;
1244 987 : 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
988 : :
989 [ # # # # ]: 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
990 : : {
1311 991 : 0 : pg_output_stream_start(ctx, data, txn, false);
992 : : }
1244 993 : 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
994 : :
1356 995 : 0 : OutputPluginPrepareWrite(ctx, true);
996 [ # # ]: 0 : if (data->include_xids)
997 : 0 : appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
998 : : else
1277 drowley@postgresql.o 999 : 0 : appendStringInfoString(ctx->out, "streaming truncate for transaction");
1356 akapila@postgresql.o 1000 : 0 : OutputPluginWrite(ctx, true);
1001 : 0 : }
|