Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_decoding.c
4 : * example logical decoding output plugin
5 : *
6 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/test_decoding/test_decoding.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "catalog/pg_type.h"
16 :
17 : #include "replication/logical.h"
18 : #include "replication/origin.h"
19 :
20 : #include "utils/builtins.h"
21 : #include "utils/lsyscache.h"
22 : #include "utils/memutils.h"
23 : #include "utils/rel.h"
24 :
3324 rhaas 25 CBC 92 : PG_MODULE_MAGIC;
26 :
27 : typedef struct
28 : {
29 : MemoryContext context;
30 : bool include_xids;
31 : bool include_timestamp;
32 : bool skip_empty_xacts;
33 : bool only_local;
34 : } TestDecodingData;
35 :
36 : /*
37 : * Maintain the per-transaction level variables to track whether the
38 : * transaction and or streams have written any changes. In streaming mode the
39 : * transaction can be decoded in streams so along with maintaining whether the
40 : * transaction has written any changes, we also need to track whether the
41 : * current stream has written any changes. This is required so that if user
42 : * has requested to skip the empty transactions we can skip the empty streams
43 : * even though the transaction has written some changes.
44 : */
45 : typedef struct
46 : {
47 : bool xact_wrote_changes;
48 : bool stream_wrote_changes;
49 : } TestDecodingTxnData;
50 :
51 : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
52 : bool is_init);
53 : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
54 : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn);
56 : static void pg_output_begin(LogicalDecodingContext *ctx,
57 : TestDecodingData *data,
58 : ReorderBufferTXN *txn,
59 : bool last_write);
60 : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
61 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
62 : static void pg_decode_change(LogicalDecodingContext *ctx,
63 : ReorderBufferTXN *txn, Relation relation,
64 : ReorderBufferChange *change);
65 : static void pg_decode_truncate(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn,
67 : int nrelations, Relation relations[],
68 : ReorderBufferChange *change);
69 : static bool pg_decode_filter(LogicalDecodingContext *ctx,
70 : RepOriginId origin_id);
71 : static void pg_decode_message(LogicalDecodingContext *ctx,
72 : ReorderBufferTXN *txn, XLogRecPtr lsn,
73 : bool transactional, const char *prefix,
74 : Size sz, const char *message);
75 : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
76 : TransactionId xid,
77 : const char *gid);
78 : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
79 : ReorderBufferTXN *txn);
80 : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn,
82 : XLogRecPtr prepare_lsn);
83 : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn,
85 : XLogRecPtr commit_lsn);
86 : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
87 : ReorderBufferTXN *txn,
88 : XLogRecPtr prepare_end_lsn,
89 : TimestampTz prepare_time);
90 : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
91 : ReorderBufferTXN *txn);
92 : static void pg_output_stream_start(LogicalDecodingContext *ctx,
93 : TestDecodingData *data,
94 : ReorderBufferTXN *txn,
95 : bool last_write);
96 : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
97 : ReorderBufferTXN *txn);
98 : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
99 : ReorderBufferTXN *txn,
100 : XLogRecPtr abort_lsn);
101 : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
102 : ReorderBufferTXN *txn,
103 : XLogRecPtr prepare_lsn);
104 : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
105 : ReorderBufferTXN *txn,
106 : XLogRecPtr commit_lsn);
107 : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
108 : ReorderBufferTXN *txn,
109 : Relation relation,
110 : ReorderBufferChange *change);
111 : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
112 : ReorderBufferTXN *txn, XLogRecPtr lsn,
113 : bool transactional, const char *prefix,
114 : Size sz, const char *message);
115 : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
116 : ReorderBufferTXN *txn,
985 akapila 117 ECB : int nrelations, Relation relations[],
118 : ReorderBufferChange *change);
119 :
3324 rhaas 120 : void
3324 rhaas 121 GIC 92 : _PG_init(void)
122 : {
123 : /* other plugins can perform things here */
3324 rhaas 124 CBC 92 : }
125 :
3324 rhaas 126 ECB : /* specify output plugin callbacks */
127 : void
3324 rhaas 128 CBC 304 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
3324 rhaas 129 ECB : {
3324 rhaas 130 CBC 304 : cb->startup_cb = pg_decode_startup;
131 304 : cb->begin_cb = pg_decode_begin_txn;
132 304 : cb->change_cb = pg_decode_change;
1828 peter_e 133 304 : cb->truncate_cb = pg_decode_truncate;
3324 rhaas 134 304 : cb->commit_cb = pg_decode_commit_txn;
2902 andres 135 304 : cb->filter_by_origin_cb = pg_decode_filter;
3324 rhaas 136 304 : cb->shutdown_cb = pg_decode_shutdown;
2559 simon 137 304 : cb->message_cb = pg_decode_message;
830 akapila 138 304 : cb->filter_prepare_cb = pg_decode_filter_prepare;
139 304 : cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
140 304 : cb->prepare_cb = pg_decode_prepare_txn;
141 304 : cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
142 304 : cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
985 143 304 : cb->stream_start_cb = pg_decode_stream_start;
144 304 : cb->stream_stop_cb = pg_decode_stream_stop;
145 304 : cb->stream_abort_cb = pg_decode_stream_abort;
830 akapila 146 GIC 304 : cb->stream_prepare_cb = pg_decode_stream_prepare;
985 147 304 : cb->stream_commit_cb = pg_decode_stream_commit;
148 304 : cb->stream_change_cb = pg_decode_stream_change;
149 304 : cb->stream_message_cb = pg_decode_stream_message;
985 akapila 150 CBC 304 : cb->stream_truncate_cb = pg_decode_stream_truncate;
3324 rhaas 151 GIC 304 : }
152 :
153 :
154 : /* initialize this plugin */
3324 rhaas 155 ECB : static void
3324 rhaas 156 GIC 304 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
3324 rhaas 157 ECB : bool is_init)
158 : {
159 : ListCell *option;
160 : TestDecodingData *data;
974 akapila 161 CBC 304 : bool enable_streaming = false;
3324 rhaas 162 ECB :
3142 andres 163 CBC 304 : data = palloc0(sizeof(TestDecodingData));
3324 rhaas 164 304 : data->context = AllocSetContextCreate(ctx->context,
165 : "text conversion context",
2416 tgl 166 ECB : ALLOCSET_DEFAULT_SIZES);
3324 rhaas 167 GIC 304 : data->include_xids = true;
3324 rhaas 168 CBC 304 : data->include_timestamp = false;
3142 andres 169 304 : data->skip_empty_xacts = false;
2902 andres 170 GIC 304 : data->only_local = false;
3324 rhaas 171 ECB :
3324 rhaas 172 GIC 304 : ctx->output_plugin_private = data;
3324 rhaas 173 ECB :
3324 rhaas 174 GIC 304 : opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
1845 peter_e 175 CBC 304 : opt->receive_rewrites = false;
176 :
3324 rhaas 177 641 : foreach(option, ctx->output_plugin_options)
178 : {
3324 rhaas 179 GIC 340 : DefElem *elem = lfirst(option);
3324 rhaas 180 ECB :
3324 rhaas 181 GBC 340 : Assert(elem->arg == NULL || IsA(elem->arg, String));
3324 rhaas 182 ECB :
3324 rhaas 183 CBC 340 : if (strcmp(elem->defname, "include-xids") == 0)
184 : {
185 : /* if option does not provide a value, it means its value is true */
3324 rhaas 186 GIC 161 : if (elem->arg == NULL)
3324 rhaas 187 UIC 0 : data->include_xids = true;
3324 rhaas 188 CBC 161 : else if (!parse_bool(strVal(elem->arg), &data->include_xids))
3324 rhaas 189 GIC 2 : ereport(ERROR,
3324 rhaas 190 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2118 tgl 191 EUB : errmsg("could not parse value \"%s\" for parameter \"%s\"",
2118 tgl 192 ECB : strVal(elem->arg), elem->defname)));
3324 rhaas 193 EUB : }
3324 rhaas 194 GIC 179 : else if (strcmp(elem->defname, "include-timestamp") == 0)
195 : {
196 1 : if (elem->arg == NULL)
3324 rhaas 197 UIC 0 : data->include_timestamp = true;
3324 rhaas 198 CBC 1 : else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
3324 rhaas 199 UIC 0 : ereport(ERROR,
200 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
201 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
2118 tgl 202 ECB : strVal(elem->arg), elem->defname)));
3324 rhaas 203 EUB : }
3324 rhaas 204 CBC 178 : else if (strcmp(elem->defname, "force-binary") == 0)
3324 rhaas 205 EUB : {
206 : bool force_binary;
207 :
3324 rhaas 208 GIC 6 : if (elem->arg == NULL)
3324 rhaas 209 UIC 0 : continue;
3324 rhaas 210 CBC 6 : else if (!parse_bool(strVal(elem->arg), &force_binary))
3324 rhaas 211 LBC 0 : ereport(ERROR,
212 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2118 tgl 213 ECB : errmsg("could not parse value \"%s\" for parameter \"%s\"",
214 : strVal(elem->arg), elem->defname)));
215 :
3324 rhaas 216 CBC 6 : if (force_binary)
3324 rhaas 217 GBC 2 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
3324 rhaas 218 ECB : }
3142 andres 219 GBC 172 : else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
220 : {
221 :
3142 andres 222 GIC 159 : if (elem->arg == NULL)
3142 andres 223 UIC 0 : data->skip_empty_xacts = true;
3142 andres 224 CBC 159 : else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
3142 andres 225 UIC 0 : ereport(ERROR,
226 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2118 tgl 227 ECB : errmsg("could not parse value \"%s\" for parameter \"%s\"",
2118 tgl 228 EUB : strVal(elem->arg), elem->defname)));
3142 andres 229 ECB : }
2902 andres 230 GBC 13 : else if (strcmp(elem->defname, "only-local") == 0)
231 : {
232 :
2902 andres 233 GIC 3 : if (elem->arg == NULL)
2902 andres 234 UIC 0 : data->only_local = true;
2902 andres 235 CBC 3 : else if (!parse_bool(strVal(elem->arg), &data->only_local))
2902 andres 236 UIC 0 : ereport(ERROR,
237 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2118 tgl 238 ECB : errmsg("could not parse value \"%s\" for parameter \"%s\"",
2118 tgl 239 EUB : strVal(elem->arg), elem->defname)));
2902 andres 240 ECB : }
1845 peter_e 241 GBC 10 : else if (strcmp(elem->defname, "include-rewrites") == 0)
242 : {
243 :
1845 peter_e 244 GIC 1 : if (elem->arg == NULL)
1845 peter_e 245 UIC 0 : continue;
1845 peter_e 246 CBC 1 : else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
1845 peter_e 247 UIC 0 : ereport(ERROR,
1845 peter_e 248 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1845 peter_e 249 EUB : errmsg("could not parse value \"%s\" for parameter \"%s\"",
1845 peter_e 250 ECB : strVal(elem->arg), elem->defname)));
1845 peter_e 251 EUB : }
974 akapila 252 GIC 9 : else if (strcmp(elem->defname, "stream-changes") == 0)
253 : {
254 8 : if (elem->arg == NULL)
974 akapila 255 UIC 0 : continue;
974 akapila 256 GIC 8 : else if (!parse_bool(strVal(elem->arg), &enable_streaming))
974 akapila 257 UIC 0 : ereport(ERROR,
974 akapila 258 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
259 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
260 : strVal(elem->arg), elem->defname)));
261 : }
262 : else
263 : {
3324 rhaas 264 GIC 1 : ereport(ERROR,
265 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3324 rhaas 266 ECB : errmsg("option \"%s\" = \"%s\" is unknown",
267 : elem->defname,
268 : elem->arg ? strVal(elem->arg) : "(null)")));
269 : }
270 : }
974 akapila 271 :
974 akapila 272 GIC 301 : ctx->streaming &= enable_streaming;
3324 rhaas 273 CBC 301 : }
274 :
275 : /* cleanup this plugin's resources */
3324 rhaas 276 ECB : static void
3324 rhaas 277 CBC 290 : pg_decode_shutdown(LogicalDecodingContext *ctx)
278 : {
3324 rhaas 279 GIC 290 : TestDecodingData *data = ctx->output_plugin_private;
280 :
3324 rhaas 281 ECB : /* cleanup our own resources via memory context reset */
3324 rhaas 282 GIC 290 : MemoryContextDelete(data->context);
3324 rhaas 283 CBC 290 : }
284 :
3324 rhaas 285 ECB : /* BEGIN callback */
286 : static void
3324 rhaas 287 CBC 415 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
3324 rhaas 288 ECB : {
3324 rhaas 289 GIC 415 : TestDecodingData *data = ctx->output_plugin_private;
290 : TestDecodingTxnData *txndata =
873 akapila 291 415 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
292 :
293 415 : txndata->xact_wrote_changes = false;
873 akapila 294 CBC 415 : txn->output_plugin_private = txndata;
3324 rhaas 295 ECB :
296 : /*
332 tgl 297 : * If asked to skip empty transactions, we'll emit BEGIN at the point
298 : * where the first operation is received for this transaction.
299 : */
3142 andres 300 GIC 415 : if (data->skip_empty_xacts)
3142 andres 301 CBC 385 : return;
302 :
303 30 : pg_output_begin(ctx, data, txn, true);
3142 andres 304 ECB : }
305 :
306 : static void
3142 andres 307 CBC 248 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
3142 andres 308 ECB : {
3142 andres 309 CBC 248 : OutputPluginPrepareWrite(ctx, last_write);
3324 rhaas 310 GIC 248 : if (data->include_xids)
311 25 : appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
312 : else
3324 rhaas 313 CBC 223 : appendStringInfoString(ctx->out, "BEGIN");
3142 andres 314 GIC 248 : OutputPluginWrite(ctx, last_write);
3324 rhaas 315 248 : }
3324 rhaas 316 ECB :
317 : /* COMMIT callback */
318 : static void
3324 rhaas 319 GIC 415 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
3324 rhaas 320 ECB : XLogRecPtr commit_lsn)
321 : {
3324 rhaas 322 GIC 415 : TestDecodingData *data = ctx->output_plugin_private;
873 akapila 323 CBC 415 : TestDecodingTxnData *txndata = txn->output_plugin_private;
324 415 : bool xact_wrote_changes = txndata->xact_wrote_changes;
325 :
326 415 : pfree(txndata);
327 415 : txn->output_plugin_private = NULL;
3324 rhaas 328 ECB :
873 akapila 329 GIC 415 : if (data->skip_empty_xacts && !xact_wrote_changes)
3142 andres 330 CBC 173 : return;
331 :
3324 rhaas 332 242 : OutputPluginPrepareWrite(ctx, true);
333 242 : if (data->include_xids)
3324 rhaas 334 GIC 25 : appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
335 : else
3324 rhaas 336 CBC 217 : appendStringInfoString(ctx->out, "COMMIT");
337 :
3324 rhaas 338 GIC 242 : if (data->include_timestamp)
339 1 : appendStringInfo(ctx->out, " (at %s)",
340 : timestamptz_to_str(txn->xact_time.commit_time));
3324 rhaas 341 ECB :
3324 rhaas 342 GIC 242 : OutputPluginWrite(ctx, true);
3324 rhaas 343 ECB : }
344 :
830 akapila 345 : /* BEGIN PREPARE callback */
346 : static void
830 akapila 347 CBC 6 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
830 akapila 348 ECB : {
830 akapila 349 GIC 6 : TestDecodingData *data = ctx->output_plugin_private;
350 : TestDecodingTxnData *txndata =
351 6 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
352 :
353 6 : txndata->xact_wrote_changes = false;
830 akapila 354 CBC 6 : txn->output_plugin_private = txndata;
830 akapila 355 ECB :
356 : /*
332 tgl 357 EUB : * If asked to skip empty transactions, we'll emit BEGIN at the point
358 : * where the first operation is received for this transaction.
359 : */
830 akapila 360 GIC 6 : if (data->skip_empty_xacts)
361 6 : return;
830 akapila 362 ECB :
830 akapila 363 UIC 0 : pg_output_begin(ctx, data, txn, true);
364 : }
830 akapila 365 ECB :
366 : /* PREPARE callback */
367 : static void
830 akapila 368 GIC 6 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
369 : XLogRecPtr prepare_lsn)
370 : {
371 6 : TestDecodingData *data = ctx->output_plugin_private;
830 akapila 372 CBC 6 : TestDecodingTxnData *txndata = txn->output_plugin_private;
830 akapila 373 EUB :
374 : /*
382 alvherre 375 ECB : * If asked to skip empty transactions, we'll emit PREPARE at the point
376 : * where the first operation is received for this transaction.
377 : */
830 akapila 378 CBC 6 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
830 akapila 379 UIC 0 : return;
830 akapila 380 ECB :
830 akapila 381 GBC 6 : OutputPluginPrepareWrite(ctx, true);
382 :
830 akapila 383 CBC 6 : appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
830 akapila 384 GBC 6 : quote_literal_cstr(txn->gid));
385 :
830 akapila 386 GIC 6 : if (data->include_xids)
830 akapila 387 LBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
388 :
830 akapila 389 GIC 6 : if (data->include_timestamp)
830 akapila 390 UIC 0 : appendStringInfo(ctx->out, " (at %s)",
391 : timestamptz_to_str(txn->xact_time.prepare_time));
830 akapila 392 ECB :
830 akapila 393 GIC 6 : OutputPluginWrite(ctx, true);
394 : }
830 akapila 395 ECB :
396 : /* COMMIT PREPARED callback */
397 : static void
830 akapila 398 GIC 6 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
830 akapila 399 ECB : XLogRecPtr commit_lsn)
400 : {
830 akapila 401 GIC 6 : TestDecodingData *data = ctx->output_plugin_private;
830 akapila 402 ECB :
830 akapila 403 GBC 6 : OutputPluginPrepareWrite(ctx, true);
404 :
830 akapila 405 CBC 6 : appendStringInfo(ctx->out, "COMMIT PREPARED %s",
830 akapila 406 GBC 6 : quote_literal_cstr(txn->gid));
407 :
830 akapila 408 GIC 6 : if (data->include_xids)
830 akapila 409 LBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
830 akapila 410 ECB :
830 akapila 411 GIC 6 : if (data->include_timestamp)
830 akapila 412 UIC 0 : appendStringInfo(ctx->out, " (at %s)",
413 : timestamptz_to_str(txn->xact_time.commit_time));
830 akapila 414 ECB :
830 akapila 415 GIC 6 : OutputPluginWrite(ctx, true);
416 6 : }
417 :
418 : /* ROLLBACK PREPARED callback */
830 akapila 419 ECB : static void
830 akapila 420 GIC 1 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
830 akapila 421 ECB : ReorderBufferTXN *txn,
422 : XLogRecPtr prepare_end_lsn,
423 : TimestampTz prepare_time)
424 : {
830 akapila 425 GIC 1 : TestDecodingData *data = ctx->output_plugin_private;
830 akapila 426 ECB :
830 akapila 427 GBC 1 : OutputPluginPrepareWrite(ctx, true);
428 :
830 akapila 429 CBC 1 : appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
830 akapila 430 GBC 1 : quote_literal_cstr(txn->gid));
431 :
830 akapila 432 GIC 1 : if (data->include_xids)
830 akapila 433 LBC 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
830 akapila 434 ECB :
830 akapila 435 GIC 1 : if (data->include_timestamp)
830 akapila 436 UIC 0 : appendStringInfo(ctx->out, " (at %s)",
437 : timestamptz_to_str(txn->xact_time.commit_time));
438 :
830 akapila 439 GIC 1 : OutputPluginWrite(ctx, true);
440 1 : }
441 :
442 : /*
443 : * Filter out two-phase transactions.
830 akapila 444 ECB : *
445 : * Each plugin can implement its own filtering logic. Here we demonstrate a
446 : * simple logic by checking the GID. If the GID contains the "_nodecode"
447 : * substring, then we filter it out.
448 : */
449 : static bool
740 akapila 450 CBC 115 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
451 : const char *gid)
452 : {
830 akapila 453 GIC 115 : if (strstr(gid, "_nodecode") != NULL)
830 akapila 454 CBC 8 : return true;
455 :
830 akapila 456 GIC 107 : return false;
830 akapila 457 ECB : }
458 :
2902 andres 459 : static bool
2902 andres 460 CBC 1194875 : pg_decode_filter(LogicalDecodingContext *ctx,
2902 andres 461 ECB : RepOriginId origin_id)
462 : {
2902 andres 463 GIC 1194875 : TestDecodingData *data = ctx->output_plugin_private;
464 :
465 1194875 : if (data->only_local && origin_id != InvalidRepOriginId)
466 9 : return true;
467 1194866 : return false;
468 : }
469 :
470 : /*
471 : * Print literal `outputstr' already represented as string of type `typid'
3324 rhaas 472 ECB : * into stringbuf `s'.
473 : *
474 : * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
475 : * if standard_conforming_strings were enabled.
476 : */
477 : static void
3324 rhaas 478 CBC 175907 : print_literal(StringInfo s, Oid typid, char *outputstr)
479 : {
480 : const char *valptr;
481 :
3324 rhaas 482 GIC 175907 : switch (typid)
483 : {
484 60217 : case INT2OID:
485 : case INT4OID:
3324 rhaas 486 ECB : case INT8OID:
487 : case OIDOID:
488 : case FLOAT4OID:
3324 rhaas 489 EUB : case FLOAT8OID:
490 : case NUMERICOID:
491 : /* NB: We don't care about Inf, NaN et al. */
3324 rhaas 492 GBC 60217 : appendStringInfoString(s, outputstr);
3324 rhaas 493 GIC 60217 : break;
3324 rhaas 494 EUB :
3324 rhaas 495 UBC 0 : case BITOID:
3324 rhaas 496 EUB : case VARBITOID:
3324 rhaas 497 UIC 0 : appendStringInfo(s, "B'%s'", outputstr);
3324 rhaas 498 UBC 0 : break;
3324 rhaas 499 EUB :
3324 rhaas 500 UIC 0 : case BOOLOID:
3324 rhaas 501 LBC 0 : if (strcmp(outputstr, "t") == 0)
502 0 : appendStringInfoString(s, "true");
3324 rhaas 503 ECB : else
3324 rhaas 504 UIC 0 : appendStringInfoString(s, "false");
3324 rhaas 505 LBC 0 : break;
506 :
3324 rhaas 507 CBC 115690 : default:
508 115690 : appendStringInfoChar(s, '\'');
509 5427135 : for (valptr = outputstr; *valptr; valptr++)
510 : {
511 5311445 : char ch = *valptr;
3324 rhaas 512 ECB :
3324 rhaas 513 GIC 5311445 : if (SQL_STR_DOUBLE(ch, false))
3324 rhaas 514 CBC 64 : appendStringInfoChar(s, ch);
3324 rhaas 515 GIC 5311445 : appendStringInfoChar(s, ch);
516 : }
517 115690 : appendStringInfoChar(s, '\'');
3324 rhaas 518 CBC 115690 : break;
519 : }
3324 rhaas 520 GIC 175907 : }
521 :
522 : /* print the tuple 'tuple' into the StringInfo s */
3324 rhaas 523 ECB : static void
3324 rhaas 524 GIC 145521 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
525 : {
526 : int natt;
527 :
528 : /* print all columns individually */
529 347185 : for (natt = 0; natt < tupdesc->natts; natt++)
530 : {
531 : Form_pg_attribute attr; /* the attribute itself */
3324 rhaas 532 ECB : Oid typid; /* type of current attribute */
533 : Oid typoutput; /* output function */
534 : bool typisvarlena;
535 : Datum origval; /* possibly toasted Datum */
536 : bool isnull; /* column is null? */
537 :
2058 andres 538 CBC 201664 : attr = TupleDescAttr(tupdesc, natt);
3324 rhaas 539 ECB :
540 : /*
541 : * don't print dropped columns, we can't be sure everything is
542 : * available for them
543 : */
3324 rhaas 544 GIC 201664 : if (attr->attisdropped)
3324 rhaas 545 CBC 5130 : continue;
3324 rhaas 546 EUB :
547 : /*
3324 rhaas 548 ECB : * Don't print system columns, oid will already have been printed if
549 : * present.
550 : */
3324 rhaas 551 CBC 201592 : if (attr->attnum < 0)
3324 rhaas 552 UIC 0 : continue;
3324 rhaas 553 ECB :
3324 rhaas 554 CBC 201592 : typid = attr->atttypid;
555 :
556 : /* get Datum from tuple */
2843 andres 557 201592 : origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
3324 rhaas 558 ECB :
3324 rhaas 559 GIC 201592 : if (isnull && skip_nulls)
560 5058 : continue;
3324 rhaas 561 ECB :
562 : /* print attribute name */
3324 rhaas 563 CBC 196534 : appendStringInfoChar(s, ' ');
3324 rhaas 564 GIC 196534 : appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
565 :
3324 rhaas 566 ECB : /* print attribute type */
3324 rhaas 567 GIC 196534 : appendStringInfoChar(s, '[');
568 196534 : appendStringInfoString(s, format_type_be(typid));
569 196534 : appendStringInfoChar(s, ']');
3324 rhaas 570 ECB :
571 : /* query output function */
3324 rhaas 572 GIC 196534 : getTypeOutputInfo(typid,
3324 rhaas 573 ECB : &typoutput, &typisvarlena);
574 :
575 : /* print separator */
3324 rhaas 576 CBC 196534 : appendStringInfoChar(s, ':');
3324 rhaas 577 ECB :
578 : /* print data */
3324 rhaas 579 GIC 196534 : if (isnull)
580 20615 : appendStringInfoString(s, "null");
581 175919 : else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
582 12 : appendStringInfoString(s, "unchanged-toast-datum");
583 175907 : else if (!typisvarlena)
3324 rhaas 584 CBC 60221 : print_literal(s, typid,
3324 rhaas 585 ECB : OidOutputFunctionCall(typoutput, origval));
586 : else
587 : {
3260 bruce 588 : Datum val; /* definitely detoasted Datum */
589 :
3324 rhaas 590 GIC 115686 : val = PointerGetDatum(PG_DETOAST_DATUM(origval));
591 115686 : print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
592 : }
593 : }
3324 rhaas 594 CBC 145521 : }
595 :
596 : /*
597 : * callback for individual changed tuples
598 : */
599 : static void
3324 rhaas 600 GIC 150510 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
601 : Relation relation, ReorderBufferChange *change)
602 : {
3324 rhaas 603 ECB : TestDecodingData *data;
873 akapila 604 : TestDecodingTxnData *txndata;
605 : Form_pg_class class_form;
606 : TupleDesc tupdesc;
3324 rhaas 607 : MemoryContext old;
608 :
3324 rhaas 609 CBC 150510 : data = ctx->output_plugin_private;
873 akapila 610 GIC 150510 : txndata = txn->output_plugin_private;
3142 andres 611 ECB :
612 : /* output BEGIN if we haven't yet */
873 akapila 613 CBC 150510 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
3142 andres 614 ECB : {
3142 andres 615 GIC 212 : pg_output_begin(ctx, data, txn, false);
616 : }
873 akapila 617 CBC 150510 : txndata->xact_wrote_changes = true;
618 :
3324 rhaas 619 150510 : class_form = RelationGetForm(relation);
3324 rhaas 620 GIC 150510 : tupdesc = RelationGetDescr(relation);
3324 rhaas 621 ECB :
622 : /* Avoid leaking memory by using and resetting our own context */
3324 rhaas 623 CBC 150510 : old = MemoryContextSwitchTo(data->context);
3324 rhaas 624 ECB :
3324 rhaas 625 CBC 150510 : OutputPluginPrepareWrite(ctx, true);
626 :
627 150510 : appendStringInfoString(ctx->out, "table ");
3324 rhaas 628 GIC 150510 : appendStringInfoString(ctx->out,
1165 alvherre 629 CBC 150510 : quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
1845 peter_e 630 GIC 150510 : class_form->relrewrite ?
1845 peter_e 631 CBC 1 : get_rel_name(class_form->relrewrite) :
2118 tgl 632 ECB : NameStr(class_form->relname)));
2890 peter_e 633 CBC 150510 : appendStringInfoChar(ctx->out, ':');
3324 rhaas 634 EUB :
3324 rhaas 635 GIC 150510 : switch (change->action)
3324 rhaas 636 ECB : {
3324 rhaas 637 CBC 132945 : case REORDER_BUFFER_CHANGE_INSERT:
3324 rhaas 638 GIC 132945 : appendStringInfoString(ctx->out, " INSERT:");
3320 tgl 639 CBC 132945 : if (change->data.tp.newtuple == NULL)
3324 rhaas 640 LBC 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
3324 rhaas 641 ECB : else
3324 rhaas 642 CBC 132945 : tuple_to_stringinfo(ctx->out, tupdesc,
3320 tgl 643 GIC 132945 : &change->data.tp.newtuple->tuple,
3324 rhaas 644 ECB : false);
3324 rhaas 645 CBC 132945 : break;
646 7543 : case REORDER_BUFFER_CHANGE_UPDATE:
3324 rhaas 647 GIC 7543 : appendStringInfoString(ctx->out, " UPDATE:");
3320 tgl 648 CBC 7543 : if (change->data.tp.oldtuple != NULL)
649 : {
3324 rhaas 650 GIC 18 : appendStringInfoString(ctx->out, " old-key:");
3324 rhaas 651 CBC 18 : tuple_to_stringinfo(ctx->out, tupdesc,
3320 tgl 652 GBC 18 : &change->data.tp.oldtuple->tuple,
653 : true);
3324 rhaas 654 CBC 18 : appendStringInfoString(ctx->out, " new-tuple:");
3324 rhaas 655 ECB : }
656 :
3320 tgl 657 CBC 7543 : if (change->data.tp.newtuple == NULL)
3324 rhaas 658 LBC 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
3324 rhaas 659 ECB : else
3324 rhaas 660 GIC 7543 : tuple_to_stringinfo(ctx->out, tupdesc,
3320 tgl 661 7543 : &change->data.tp.newtuple->tuple,
3324 rhaas 662 ECB : false);
3324 rhaas 663 CBC 7543 : break;
3324 rhaas 664 GIC 10022 : case REORDER_BUFFER_CHANGE_DELETE:
665 10022 : appendStringInfoString(ctx->out, " DELETE:");
3324 rhaas 666 ECB :
667 : /* if there was no PK, we only know that a delete happened */
3320 tgl 668 GIC 10022 : if (change->data.tp.oldtuple == NULL)
3324 rhaas 669 CBC 5007 : appendStringInfoString(ctx->out, " (no-tuple-data)");
3324 rhaas 670 EUB : /* In DELETE, only the replica identity is present; display that */
671 : else
3324 rhaas 672 GIC 5015 : tuple_to_stringinfo(ctx->out, tupdesc,
3320 tgl 673 5015 : &change->data.tp.oldtuple->tuple,
3324 rhaas 674 ECB : true);
3324 rhaas 675 CBC 10022 : break;
3320 tgl 676 UIC 0 : default:
3320 tgl 677 LBC 0 : Assert(false);
3324 rhaas 678 ECB : }
679 :
3324 rhaas 680 GIC 150510 : MemoryContextSwitchTo(old);
3324 rhaas 681 CBC 150510 : MemoryContextReset(data->context);
682 :
3324 rhaas 683 GIC 150510 : OutputPluginWrite(ctx, true);
684 150510 : }
685 :
686 : static void
1828 peter_e 687 6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
688 : int nrelations, Relation relations[], ReorderBufferChange *change)
1828 peter_e 689 ECB : {
690 : TestDecodingData *data;
691 : TestDecodingTxnData *txndata;
692 : MemoryContext old;
693 : int i;
694 :
1828 peter_e 695 CBC 6 : data = ctx->output_plugin_private;
873 akapila 696 GIC 6 : txndata = txn->output_plugin_private;
1828 peter_e 697 ECB :
698 : /* output BEGIN if we haven't yet */
873 akapila 699 GIC 6 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
1828 peter_e 700 ECB : {
1828 peter_e 701 GIC 6 : pg_output_begin(ctx, data, txn, false);
1828 peter_e 702 ECB : }
873 akapila 703 GIC 6 : txndata->xact_wrote_changes = true;
1828 peter_e 704 ECB :
705 : /* Avoid leaking memory by using and resetting our own context */
1828 peter_e 706 CBC 6 : old = MemoryContextSwitchTo(data->context);
707 :
708 6 : OutputPluginPrepareWrite(ctx, true);
1828 peter_e 709 ECB :
1828 peter_e 710 GIC 6 : appendStringInfoString(ctx->out, "table ");
1828 peter_e 711 ECB :
1828 peter_e 712 CBC 13 : for (i = 0; i < nrelations; i++)
1828 peter_e 713 ECB : {
1828 peter_e 714 GIC 7 : if (i > 0)
715 1 : appendStringInfoString(ctx->out, ", ");
1828 peter_e 716 ECB :
1828 peter_e 717 GIC 7 : appendStringInfoString(ctx->out,
1828 peter_e 718 CBC 7 : quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
719 7 : NameStr(relations[i]->rd_rel->relname)));
720 : }
1828 peter_e 721 ECB :
1828 peter_e 722 CBC 6 : appendStringInfoString(ctx->out, ": TRUNCATE:");
1828 peter_e 723 ECB :
1828 peter_e 724 CBC 6 : if (change->data.truncate.restart_seqs
1828 peter_e 725 GIC 5 : || change->data.truncate.cascade)
726 : {
1828 peter_e 727 CBC 1 : if (change->data.truncate.restart_seqs)
1375 drowley 728 GIC 1 : appendStringInfoString(ctx->out, " restart_seqs");
1828 peter_e 729 CBC 1 : if (change->data.truncate.cascade)
1375 drowley 730 1 : appendStringInfoString(ctx->out, " cascade");
731 : }
1828 peter_e 732 ECB : else
1828 peter_e 733 CBC 5 : appendStringInfoString(ctx->out, " (no-flags)");
734 :
1828 peter_e 735 GIC 6 : MemoryContextSwitchTo(old);
1828 peter_e 736 CBC 6 : MemoryContextReset(data->context);
737 :
1828 peter_e 738 GIC 6 : OutputPluginWrite(ctx, true);
739 6 : }
1828 peter_e 740 ECB :
2559 simon 741 : static void
2559 simon 742 GIC 8 : pg_decode_message(LogicalDecodingContext *ctx,
2559 simon 743 ECB : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
744 : const char *prefix, Size sz, const char *message)
745 : {
2559 simon 746 GIC 8 : OutputPluginPrepareWrite(ctx, true);
747 8 : appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
2559 simon 748 ECB : transactional, prefix, sz);
2559 simon 749 GIC 8 : appendBinaryStringInfo(ctx->out, message, sz);
750 8 : OutputPluginWrite(ctx, true);
2559 simon 751 CBC 8 : }
985 akapila 752 ECB :
753 : static void
985 akapila 754 GIC 11 : pg_decode_stream_start(LogicalDecodingContext *ctx,
755 : ReorderBufferTXN *txn)
756 : {
985 akapila 757 CBC 11 : TestDecodingData *data = ctx->output_plugin_private;
873 akapila 758 GIC 11 : TestDecodingTxnData *txndata = txn->output_plugin_private;
759 :
873 akapila 760 ECB : /*
761 : * Allocate the txn plugin data for the first stream in the transaction.
762 : */
873 akapila 763 GIC 11 : if (txndata == NULL)
764 : {
873 akapila 765 ECB : txndata =
873 akapila 766 CBC 7 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
767 7 : txndata->xact_wrote_changes = false;
873 akapila 768 GBC 7 : txn->output_plugin_private = txndata;
769 : }
770 :
873 akapila 771 GIC 11 : txndata->stream_wrote_changes = false;
940 akapila 772 CBC 11 : if (data->skip_empty_xacts)
940 akapila 773 GIC 11 : return;
940 akapila 774 LBC 0 : pg_output_stream_start(ctx, data, txn, true);
940 akapila 775 ECB : }
940 akapila 776 EUB :
777 : static void
940 akapila 778 CBC 6 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
940 akapila 779 ECB : {
940 akapila 780 CBC 6 : OutputPluginPrepareWrite(ctx, last_write);
985 akapila 781 GIC 6 : if (data->include_xids)
985 akapila 782 UIC 0 : appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
985 akapila 783 ECB : else
906 drowley 784 GIC 6 : appendStringInfoString(ctx->out, "opening a streamed block for transaction");
940 akapila 785 6 : OutputPluginWrite(ctx, last_write);
985 akapila 786 CBC 6 : }
985 akapila 787 ECB :
788 : static void
985 akapila 789 CBC 11 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
985 akapila 790 ECB : ReorderBufferTXN *txn)
791 : {
985 akapila 792 CBC 11 : TestDecodingData *data = ctx->output_plugin_private;
873 793 11 : TestDecodingTxnData *txndata = txn->output_plugin_private;
985 akapila 794 EUB :
873 akapila 795 GIC 11 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
940 akapila 796 CBC 5 : return;
940 akapila 797 ECB :
985 akapila 798 GIC 6 : OutputPluginPrepareWrite(ctx, true);
799 6 : if (data->include_xids)
985 akapila 800 UIC 0 : appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
985 akapila 801 ECB : else
906 drowley 802 GIC 6 : appendStringInfoString(ctx->out, "closing a streamed block for transaction");
985 akapila 803 6 : OutputPluginWrite(ctx, true);
804 : }
985 akapila 805 ECB :
806 : static void
985 akapila 807 GIC 3 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
808 : ReorderBufferTXN *txn,
809 : XLogRecPtr abort_lsn)
810 : {
811 3 : TestDecodingData *data = ctx->output_plugin_private;
985 akapila 812 ECB :
873 813 : /*
814 : * stream abort can be sent for an individual subtransaction but we
815 : * maintain the output_plugin_private only under the toptxn so if this is
816 : * not the toptxn then fetch the toptxn.
817 : */
23 akapila 818 GNC 3 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
873 akapila 819 GBC 3 : TestDecodingTxnData *txndata = toptxn->output_plugin_private;
820 3 : bool xact_wrote_changes = txndata->xact_wrote_changes;
821 :
23 akapila 822 GNC 3 : if (rbtxn_is_toptxn(txn))
873 akapila 823 ECB : {
873 akapila 824 LBC 0 : Assert(txn->output_plugin_private != NULL);
873 akapila 825 UIC 0 : pfree(txndata);
873 akapila 826 UBC 0 : txn->output_plugin_private = NULL;
873 akapila 827 EUB : }
828 :
873 akapila 829 GIC 3 : if (data->skip_empty_xacts && !xact_wrote_changes)
940 akapila 830 GBC 3 : return;
940 akapila 831 EUB :
985 akapila 832 UIC 0 : OutputPluginPrepareWrite(ctx, true);
833 0 : if (data->include_xids)
834 0 : appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
985 akapila 835 ECB : else
906 drowley 836 UIC 0 : appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
985 akapila 837 0 : OutputPluginWrite(ctx, true);
838 : }
985 akapila 839 ECB :
830 840 : static void
830 akapila 841 GIC 1 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
830 akapila 842 ECB : ReorderBufferTXN *txn,
830 akapila 843 EUB : XLogRecPtr prepare_lsn)
844 : {
830 akapila 845 CBC 1 : TestDecodingData *data = ctx->output_plugin_private;
830 akapila 846 GIC 1 : TestDecodingTxnData *txndata = txn->output_plugin_private;
830 akapila 847 ECB :
830 akapila 848 GBC 1 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
830 akapila 849 UBC 0 : return;
850 :
830 akapila 851 CBC 1 : OutputPluginPrepareWrite(ctx, true);
830 akapila 852 ECB :
830 akapila 853 GIC 1 : if (data->include_xids)
830 akapila 854 LBC 0 : appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
830 akapila 855 UBC 0 : quote_literal_cstr(txn->gid), txn->xid);
856 : else
830 akapila 857 GIC 1 : appendStringInfo(ctx->out, "preparing streamed transaction %s",
830 akapila 858 CBC 1 : quote_literal_cstr(txn->gid));
859 :
830 akapila 860 GIC 1 : if (data->include_timestamp)
830 akapila 861 UIC 0 : appendStringInfo(ctx->out, " (at %s)",
634 akapila 862 ECB : timestamptz_to_str(txn->xact_time.prepare_time));
863 :
830 akapila 864 GIC 1 : OutputPluginWrite(ctx, true);
865 : }
830 akapila 866 ECB :
985 867 : static void
985 akapila 868 CBC 4 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
869 : ReorderBufferTXN *txn,
985 akapila 870 ECB : XLogRecPtr commit_lsn)
871 : {
985 akapila 872 GIC 4 : TestDecodingData *data = ctx->output_plugin_private;
873 akapila 873 CBC 4 : TestDecodingTxnData *txndata = txn->output_plugin_private;
873 akapila 874 GBC 4 : bool xact_wrote_changes = txndata->xact_wrote_changes;
875 :
873 akapila 876 CBC 4 : pfree(txndata);
873 akapila 877 GIC 4 : txn->output_plugin_private = NULL;
985 akapila 878 ECB :
873 akapila 879 GBC 4 : if (data->skip_empty_xacts && !xact_wrote_changes)
940 akapila 880 UIC 0 : return;
940 akapila 881 ECB :
985 akapila 882 GIC 4 : OutputPluginPrepareWrite(ctx, true);
985 akapila 883 ECB :
985 akapila 884 GBC 4 : if (data->include_xids)
985 akapila 885 UIC 0 : appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
886 : else
906 drowley 887 CBC 4 : appendStringInfoString(ctx->out, "committing streamed transaction");
888 :
985 akapila 889 GIC 4 : if (data->include_timestamp)
985 akapila 890 UIC 0 : appendStringInfo(ctx->out, " (at %s)",
891 : timestamptz_to_str(txn->xact_time.commit_time));
892 :
985 akapila 893 GIC 4 : OutputPluginWrite(ctx, true);
894 : }
895 :
985 akapila 896 ECB : /*
897 : * In streaming mode, we don't display the changes as the transaction can abort
898 : * at a later point in time. We don't want users to see the changes until the
899 : * transaction is committed.
900 : */
901 : static void
985 akapila 902 CBC 63 : pg_decode_stream_change(LogicalDecodingContext *ctx,
903 : ReorderBufferTXN *txn,
904 : Relation relation,
985 akapila 905 ECB : ReorderBufferChange *change)
906 : {
985 akapila 907 CBC 63 : TestDecodingData *data = ctx->output_plugin_private;
873 akapila 908 GIC 63 : TestDecodingTxnData *txndata = txn->output_plugin_private;
985 akapila 909 ECB :
910 : /* output stream start if we haven't yet */
873 akapila 911 CBC 63 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
940 akapila 912 ECB : {
940 akapila 913 GBC 6 : pg_output_stream_start(ctx, data, txn, false);
914 : }
873 akapila 915 CBC 63 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
940 akapila 916 ECB :
985 akapila 917 CBC 63 : OutputPluginPrepareWrite(ctx, true);
985 akapila 918 GIC 63 : if (data->include_xids)
985 akapila 919 UIC 0 : appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
920 : else
906 drowley 921 GIC 63 : appendStringInfoString(ctx->out, "streaming change for transaction");
985 akapila 922 63 : OutputPluginWrite(ctx, true);
923 63 : }
924 :
985 akapila 925 ECB : /*
926 : * In streaming mode, we don't display the contents for transactional messages
927 : * as the transaction can abort at a later point in time. We don't want users to
928 : * see the message contents until the transaction is committed.
929 : */
930 : static void
985 akapila 931 CBC 3 : pg_decode_stream_message(LogicalDecodingContext *ctx,
932 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
985 akapila 933 ECB : const char *prefix, Size sz, const char *message)
934 : {
985 akapila 935 GIC 3 : OutputPluginPrepareWrite(ctx, true);
936 :
937 3 : if (transactional)
985 akapila 938 EUB : {
985 akapila 939 GIC 3 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
985 akapila 940 EUB : transactional, prefix, sz);
941 : }
942 : else
985 akapila 943 ECB : {
985 akapila 944 LBC 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
945 : transactional, prefix, sz);
985 akapila 946 UIC 0 : appendBinaryStringInfo(ctx->out, message, sz);
947 : }
948 :
985 akapila 949 GIC 3 : OutputPluginWrite(ctx, true);
950 3 : }
985 akapila 951 EUB :
952 : /*
953 : * In streaming mode, we don't display the detailed information of Truncate.
954 : * See pg_decode_stream_change.
955 : */
956 : static void
985 akapila 957 UIC 0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
985 akapila 958 EUB : int nrelations, Relation relations[],
959 : ReorderBufferChange *change)
960 : {
985 akapila 961 UIC 0 : TestDecodingData *data = ctx->output_plugin_private;
873 akapila 962 UBC 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
963 :
964 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
940 akapila 965 EUB : {
940 akapila 966 UBC 0 : pg_output_stream_start(ctx, data, txn, false);
967 : }
873 968 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
940 akapila 969 EUB :
985 akapila 970 UBC 0 : OutputPluginPrepareWrite(ctx, true);
985 akapila 971 UIC 0 : if (data->include_xids)
972 0 : appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
973 : else
906 drowley 974 0 : appendStringInfoString(ctx->out, "streaming truncate for transaction");
985 akapila 975 0 : OutputPluginWrite(ctx, true);
976 0 : }
|