LCOV - differential code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 83.4 % 403 336 14 43 10 22 171 2 141 34 179 1 2
Current Date: 2023-04-08 15:15:32 Functions: 96.4 % 28 27 1 26 1 1 26
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * test_decoding.c
       4                 :  *        example logical decoding output plugin
       5                 :  *
       6                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *        contrib/test_decoding/test_decoding.c
      10                 :  *
      11                 :  *-------------------------------------------------------------------------
      12                 :  */
      13                 : #include "postgres.h"
      14                 : 
      15                 : #include "catalog/pg_type.h"
      16                 : 
      17                 : #include "replication/logical.h"
      18                 : #include "replication/origin.h"
      19                 : 
      20                 : #include "utils/builtins.h"
      21                 : #include "utils/lsyscache.h"
      22                 : #include "utils/memutils.h"
      23                 : #include "utils/rel.h"
      24                 : 
      25 CBC          92 : PG_MODULE_MAGIC;
      26                 : 
      27                 : typedef struct
      28                 : {
      29                 :     MemoryContext context;
      30                 :     bool        include_xids;
      31                 :     bool        include_timestamp;
      32                 :     bool        skip_empty_xacts;
      33                 :     bool        only_local;
      34                 : } TestDecodingData;
      35                 : 
      36                 : /*
      37                 :  * Maintain the per-transaction level variables to track whether the
      38                 :  * transaction and or streams have written any changes. In streaming mode the
      39                 :  * transaction can be decoded in streams so along with maintaining whether the
      40                 :  * transaction has written any changes, we also need to track whether the
      41                 :  * current stream has written any changes. This is required so that if user
      42                 :  * has requested to skip the empty transactions we can skip the empty streams
      43                 :  * even though the transaction has written some changes.
      44                 :  */
      45                 : typedef struct
      46                 : {
      47                 :     bool        xact_wrote_changes;
      48                 :     bool        stream_wrote_changes;
      49                 : } TestDecodingTxnData;
      50                 : 
      51                 : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      52                 :                               bool is_init);
      53                 : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      54                 : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      55                 :                                 ReorderBufferTXN *txn);
      56                 : static void pg_output_begin(LogicalDecodingContext *ctx,
      57                 :                             TestDecodingData *data,
      58                 :                             ReorderBufferTXN *txn,
      59                 :                             bool last_write);
      60                 : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      61                 :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      62                 : static void pg_decode_change(LogicalDecodingContext *ctx,
      63                 :                              ReorderBufferTXN *txn, Relation relation,
      64                 :                              ReorderBufferChange *change);
      65                 : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      66                 :                                ReorderBufferTXN *txn,
      67                 :                                int nrelations, Relation relations[],
      68                 :                                ReorderBufferChange *change);
      69                 : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      70                 :                              RepOriginId origin_id);
      71                 : static void pg_decode_message(LogicalDecodingContext *ctx,
      72                 :                               ReorderBufferTXN *txn, XLogRecPtr lsn,
      73                 :                               bool transactional, const char *prefix,
      74                 :                               Size sz, const char *message);
      75                 : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
      76                 :                                      TransactionId xid,
      77                 :                                      const char *gid);
      78                 : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
      79                 :                                         ReorderBufferTXN *txn);
      80                 : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
      81                 :                                   ReorderBufferTXN *txn,
      82                 :                                   XLogRecPtr prepare_lsn);
      83                 : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
      84                 :                                           ReorderBufferTXN *txn,
      85                 :                                           XLogRecPtr commit_lsn);
      86                 : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
      87                 :                                             ReorderBufferTXN *txn,
      88                 :                                             XLogRecPtr prepare_end_lsn,
      89                 :                                             TimestampTz prepare_time);
      90                 : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      91                 :                                    ReorderBufferTXN *txn);
      92                 : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      93                 :                                    TestDecodingData *data,
      94                 :                                    ReorderBufferTXN *txn,
      95                 :                                    bool last_write);
      96                 : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
      97                 :                                   ReorderBufferTXN *txn);
      98                 : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
      99                 :                                    ReorderBufferTXN *txn,
     100                 :                                    XLogRecPtr abort_lsn);
     101                 : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     102                 :                                      ReorderBufferTXN *txn,
     103                 :                                      XLogRecPtr prepare_lsn);
     104                 : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
     105                 :                                     ReorderBufferTXN *txn,
     106                 :                                     XLogRecPtr commit_lsn);
     107                 : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
     108                 :                                     ReorderBufferTXN *txn,
     109                 :                                     Relation relation,
     110                 :                                     ReorderBufferChange *change);
     111                 : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
     112                 :                                      ReorderBufferTXN *txn, XLogRecPtr lsn,
     113                 :                                      bool transactional, const char *prefix,
     114                 :                                      Size sz, const char *message);
     115                 : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     116                 :                                       ReorderBufferTXN *txn,
     117 ECB             :                                       int nrelations, Relation relations[],
     118                 :                                       ReorderBufferChange *change);
     119                 : 
     120                 : void
     121 GIC          92 : _PG_init(void)
     122                 : {
     123                 :     /* other plugins can perform things here */
     124 CBC          92 : }
     125                 : 
     126 ECB             : /* specify output plugin callbacks */
     127                 : void
     128 CBC         304 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     129 ECB             : {
     130 CBC         304 :     cb->startup_cb = pg_decode_startup;
     131             304 :     cb->begin_cb = pg_decode_begin_txn;
     132             304 :     cb->change_cb = pg_decode_change;
     133             304 :     cb->truncate_cb = pg_decode_truncate;
     134             304 :     cb->commit_cb = pg_decode_commit_txn;
     135             304 :     cb->filter_by_origin_cb = pg_decode_filter;
     136             304 :     cb->shutdown_cb = pg_decode_shutdown;
     137             304 :     cb->message_cb = pg_decode_message;
     138             304 :     cb->filter_prepare_cb = pg_decode_filter_prepare;
     139             304 :     cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
     140             304 :     cb->prepare_cb = pg_decode_prepare_txn;
     141             304 :     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
     142             304 :     cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
     143             304 :     cb->stream_start_cb = pg_decode_stream_start;
     144             304 :     cb->stream_stop_cb = pg_decode_stream_stop;
     145             304 :     cb->stream_abort_cb = pg_decode_stream_abort;
     146 GIC         304 :     cb->stream_prepare_cb = pg_decode_stream_prepare;
     147             304 :     cb->stream_commit_cb = pg_decode_stream_commit;
     148             304 :     cb->stream_change_cb = pg_decode_stream_change;
     149             304 :     cb->stream_message_cb = pg_decode_stream_message;
     150 CBC         304 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
     151 GIC         304 : }
     152                 : 
     153                 : 
     154                 : /* initialize this plugin */
     155 ECB             : static void
     156 GIC         304 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     157 ECB             :                   bool is_init)
     158                 : {
     159                 :     ListCell   *option;
     160                 :     TestDecodingData *data;
     161 CBC         304 :     bool        enable_streaming = false;
     162 ECB             : 
     163 CBC         304 :     data = palloc0(sizeof(TestDecodingData));
     164             304 :     data->context = AllocSetContextCreate(ctx->context,
     165                 :                                           "text conversion context",
     166 ECB             :                                           ALLOCSET_DEFAULT_SIZES);
     167 GIC         304 :     data->include_xids = true;
     168 CBC         304 :     data->include_timestamp = false;
     169             304 :     data->skip_empty_xacts = false;
     170 GIC         304 :     data->only_local = false;
     171 ECB             : 
     172 GIC         304 :     ctx->output_plugin_private = data;
     173 ECB             : 
     174 GIC         304 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     175 CBC         304 :     opt->receive_rewrites = false;
     176                 : 
     177             641 :     foreach(option, ctx->output_plugin_options)
     178                 :     {
     179 GIC         340 :         DefElem    *elem = lfirst(option);
     180 ECB             : 
     181 GBC         340 :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     182 ECB             : 
     183 CBC         340 :         if (strcmp(elem->defname, "include-xids") == 0)
     184                 :         {
     185                 :             /* if option does not provide a value, it means its value is true */
     186 GIC         161 :             if (elem->arg == NULL)
     187 UIC           0 :                 data->include_xids = true;
     188 CBC         161 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     189 GIC           2 :                 ereport(ERROR,
     190 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     191 EUB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     192 ECB             :                                 strVal(elem->arg), elem->defname)));
     193 EUB             :         }
     194 GIC         179 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     195                 :         {
     196               1 :             if (elem->arg == NULL)
     197 UIC           0 :                 data->include_timestamp = true;
     198 CBC           1 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     199 UIC           0 :                 ereport(ERROR,
     200                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     201                 :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     202 ECB             :                                 strVal(elem->arg), elem->defname)));
     203 EUB             :         }
     204 CBC         178 :         else if (strcmp(elem->defname, "force-binary") == 0)
     205 EUB             :         {
     206                 :             bool        force_binary;
     207                 : 
     208 GIC           6 :             if (elem->arg == NULL)
     209 UIC           0 :                 continue;
     210 CBC           6 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     211 LBC           0 :                 ereport(ERROR,
     212                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     213 ECB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     214                 :                                 strVal(elem->arg), elem->defname)));
     215                 : 
     216 CBC           6 :             if (force_binary)
     217 GBC           2 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     218 ECB             :         }
     219 GBC         172 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     220                 :         {
     221                 : 
     222 GIC         159 :             if (elem->arg == NULL)
     223 UIC           0 :                 data->skip_empty_xacts = true;
     224 CBC         159 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     225 UIC           0 :                 ereport(ERROR,
     226                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     227 ECB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     228 EUB             :                                 strVal(elem->arg), elem->defname)));
     229 ECB             :         }
     230 GBC          13 :         else if (strcmp(elem->defname, "only-local") == 0)
     231                 :         {
     232                 : 
     233 GIC           3 :             if (elem->arg == NULL)
     234 UIC           0 :                 data->only_local = true;
     235 CBC           3 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     236 UIC           0 :                 ereport(ERROR,
     237                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     238 ECB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     239 EUB             :                                 strVal(elem->arg), elem->defname)));
     240 ECB             :         }
     241 GBC          10 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     242                 :         {
     243                 : 
     244 GIC           1 :             if (elem->arg == NULL)
     245 UIC           0 :                 continue;
     246 CBC           1 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     247 UIC           0 :                 ereport(ERROR,
     248 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     249 EUB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     250 ECB             :                                 strVal(elem->arg), elem->defname)));
     251 EUB             :         }
     252 GIC           9 :         else if (strcmp(elem->defname, "stream-changes") == 0)
     253                 :         {
     254               8 :             if (elem->arg == NULL)
     255 UIC           0 :                 continue;
     256 GIC           8 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     257 UIC           0 :                 ereport(ERROR,
     258 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     259                 :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     260                 :                                 strVal(elem->arg), elem->defname)));
     261                 :         }
     262                 :         else
     263                 :         {
     264 GIC           1 :             ereport(ERROR,
     265                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     266 ECB             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     267                 :                             elem->defname,
     268                 :                             elem->arg ? strVal(elem->arg) : "(null)")));
     269                 :         }
     270                 :     }
     271                 : 
     272 GIC         301 :     ctx->streaming &= enable_streaming;
     273 CBC         301 : }
     274                 : 
     275                 : /* cleanup this plugin's resources */
     276 ECB             : static void
     277 CBC         290 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     278                 : {
     279 GIC         290 :     TestDecodingData *data = ctx->output_plugin_private;
     280                 : 
     281 ECB             :     /* cleanup our own resources via memory context reset */
     282 GIC         290 :     MemoryContextDelete(data->context);
     283 CBC         290 : }
     284                 : 
     285 ECB             : /* BEGIN callback */
     286                 : static void
     287 CBC         415 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     288 ECB             : {
     289 GIC         415 :     TestDecodingData *data = ctx->output_plugin_private;
     290                 :     TestDecodingTxnData *txndata =
     291             415 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     292                 : 
     293             415 :     txndata->xact_wrote_changes = false;
     294 CBC         415 :     txn->output_plugin_private = txndata;
     295 ECB             : 
     296                 :     /*
     297                 :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     298                 :      * where the first operation is received for this transaction.
     299                 :      */
     300 GIC         415 :     if (data->skip_empty_xacts)
     301 CBC         385 :         return;
     302                 : 
     303              30 :     pg_output_begin(ctx, data, txn, true);
     304 ECB             : }
     305                 : 
     306                 : static void
     307 CBC         248 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     308 ECB             : {
     309 CBC         248 :     OutputPluginPrepareWrite(ctx, last_write);
     310 GIC         248 :     if (data->include_xids)
     311              25 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     312                 :     else
     313 CBC         223 :         appendStringInfoString(ctx->out, "BEGIN");
     314 GIC         248 :     OutputPluginWrite(ctx, last_write);
     315             248 : }
     316 ECB             : 
     317                 : /* COMMIT callback */
     318                 : static void
     319 GIC         415 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     320 ECB             :                      XLogRecPtr commit_lsn)
     321                 : {
     322 GIC         415 :     TestDecodingData *data = ctx->output_plugin_private;
     323 CBC         415 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     324             415 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     325                 : 
     326             415 :     pfree(txndata);
     327             415 :     txn->output_plugin_private = NULL;
     328 ECB             : 
     329 GIC         415 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     330 CBC         173 :         return;
     331                 : 
     332             242 :     OutputPluginPrepareWrite(ctx, true);
     333             242 :     if (data->include_xids)
     334 GIC          25 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     335                 :     else
     336 CBC         217 :         appendStringInfoString(ctx->out, "COMMIT");
     337                 : 
     338 GIC         242 :     if (data->include_timestamp)
     339               1 :         appendStringInfo(ctx->out, " (at %s)",
     340                 :                          timestamptz_to_str(txn->xact_time.commit_time));
     341 ECB             : 
     342 GIC         242 :     OutputPluginWrite(ctx, true);
     343 ECB             : }
     344                 : 
     345                 : /* BEGIN PREPARE callback */
     346                 : static void
     347 CBC           6 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     348 ECB             : {
     349 GIC           6 :     TestDecodingData *data = ctx->output_plugin_private;
     350                 :     TestDecodingTxnData *txndata =
     351               6 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     352                 : 
     353               6 :     txndata->xact_wrote_changes = false;
     354 CBC           6 :     txn->output_plugin_private = txndata;
     355 ECB             : 
     356                 :     /*
     357 EUB             :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     358                 :      * where the first operation is received for this transaction.
     359                 :      */
     360 GIC           6 :     if (data->skip_empty_xacts)
     361               6 :         return;
     362 ECB             : 
     363 UIC           0 :     pg_output_begin(ctx, data, txn, true);
     364                 : }
     365 ECB             : 
     366                 : /* PREPARE callback */
     367                 : static void
     368 GIC           6 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     369                 :                       XLogRecPtr prepare_lsn)
     370                 : {
     371               6 :     TestDecodingData *data = ctx->output_plugin_private;
     372 CBC           6 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     373 EUB             : 
     374                 :     /*
     375 ECB             :      * If asked to skip empty transactions, we'll emit PREPARE at the point
     376                 :      * where the first operation is received for this transaction.
     377                 :      */
     378 CBC           6 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     379 UIC           0 :         return;
     380 ECB             : 
     381 GBC           6 :     OutputPluginPrepareWrite(ctx, true);
     382                 : 
     383 CBC           6 :     appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
     384 GBC           6 :                      quote_literal_cstr(txn->gid));
     385                 : 
     386 GIC           6 :     if (data->include_xids)
     387 LBC           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     388                 : 
     389 GIC           6 :     if (data->include_timestamp)
     390 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
     391                 :                          timestamptz_to_str(txn->xact_time.prepare_time));
     392 ECB             : 
     393 GIC           6 :     OutputPluginWrite(ctx, true);
     394                 : }
     395 ECB             : 
     396                 : /* COMMIT PREPARED callback */
     397                 : static void
     398 GIC           6 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     399 ECB             :                               XLogRecPtr commit_lsn)
     400                 : {
     401 GIC           6 :     TestDecodingData *data = ctx->output_plugin_private;
     402 ECB             : 
     403 GBC           6 :     OutputPluginPrepareWrite(ctx, true);
     404                 : 
     405 CBC           6 :     appendStringInfo(ctx->out, "COMMIT PREPARED %s",
     406 GBC           6 :                      quote_literal_cstr(txn->gid));
     407                 : 
     408 GIC           6 :     if (data->include_xids)
     409 LBC           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     410 ECB             : 
     411 GIC           6 :     if (data->include_timestamp)
     412 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
     413                 :                          timestamptz_to_str(txn->xact_time.commit_time));
     414 ECB             : 
     415 GIC           6 :     OutputPluginWrite(ctx, true);
     416               6 : }
     417                 : 
     418                 : /* ROLLBACK PREPARED callback */
     419 ECB             : static void
     420 GIC           1 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
     421 ECB             :                                 ReorderBufferTXN *txn,
     422                 :                                 XLogRecPtr prepare_end_lsn,
     423                 :                                 TimestampTz prepare_time)
     424                 : {
     425 GIC           1 :     TestDecodingData *data = ctx->output_plugin_private;
     426 ECB             : 
     427 GBC           1 :     OutputPluginPrepareWrite(ctx, true);
     428                 : 
     429 CBC           1 :     appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
     430 GBC           1 :                      quote_literal_cstr(txn->gid));
     431                 : 
     432 GIC           1 :     if (data->include_xids)
     433 LBC           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     434 ECB             : 
     435 GIC           1 :     if (data->include_timestamp)
     436 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
     437                 :                          timestamptz_to_str(txn->xact_time.commit_time));
     438                 : 
     439 GIC           1 :     OutputPluginWrite(ctx, true);
     440               1 : }
     441                 : 
     442                 : /*
     443                 :  * Filter out two-phase transactions.
     444 ECB             :  *
     445                 :  * Each plugin can implement its own filtering logic. Here we demonstrate a
     446                 :  * simple logic by checking the GID. If the GID contains the "_nodecode"
     447                 :  * substring, then we filter it out.
     448                 :  */
     449                 : static bool
     450 CBC         115 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
     451                 :                          const char *gid)
     452                 : {
     453 GIC         115 :     if (strstr(gid, "_nodecode") != NULL)
     454 CBC           8 :         return true;
     455                 : 
     456 GIC         107 :     return false;
     457 ECB             : }
     458                 : 
     459                 : static bool
     460 CBC     1194875 : pg_decode_filter(LogicalDecodingContext *ctx,
     461 ECB             :                  RepOriginId origin_id)
     462                 : {
     463 GIC     1194875 :     TestDecodingData *data = ctx->output_plugin_private;
     464                 : 
     465         1194875 :     if (data->only_local && origin_id != InvalidRepOriginId)
     466               9 :         return true;
     467         1194866 :     return false;
     468                 : }
     469                 : 
     470                 : /*
     471                 :  * Print literal `outputstr' already represented as string of type `typid'
     472 ECB             :  * into stringbuf `s'.
     473                 :  *
     474                 :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     475                 :  * if standard_conforming_strings were enabled.
     476                 :  */
     477                 : static void
     478 CBC      175907 : print_literal(StringInfo s, Oid typid, char *outputstr)
     479                 : {
     480                 :     const char *valptr;
     481                 : 
     482 GIC      175907 :     switch (typid)
     483                 :     {
     484           60217 :         case INT2OID:
     485                 :         case INT4OID:
     486 ECB             :         case INT8OID:
     487                 :         case OIDOID:
     488                 :         case FLOAT4OID:
     489 EUB             :         case FLOAT8OID:
     490                 :         case NUMERICOID:
     491                 :             /* NB: We don't care about Inf, NaN et al. */
     492 GBC       60217 :             appendStringInfoString(s, outputstr);
     493 GIC       60217 :             break;
     494 EUB             : 
     495 UBC           0 :         case BITOID:
     496 EUB             :         case VARBITOID:
     497 UIC           0 :             appendStringInfo(s, "B'%s'", outputstr);
     498 UBC           0 :             break;
     499 EUB             : 
     500 UIC           0 :         case BOOLOID:
     501 LBC           0 :             if (strcmp(outputstr, "t") == 0)
     502               0 :                 appendStringInfoString(s, "true");
     503 ECB             :             else
     504 UIC           0 :                 appendStringInfoString(s, "false");
     505 LBC           0 :             break;
     506                 : 
     507 CBC      115690 :         default:
     508          115690 :             appendStringInfoChar(s, '\'');
     509         5427135 :             for (valptr = outputstr; *valptr; valptr++)
     510                 :             {
     511         5311445 :                 char        ch = *valptr;
     512 ECB             : 
     513 GIC     5311445 :                 if (SQL_STR_DOUBLE(ch, false))
     514 CBC          64 :                     appendStringInfoChar(s, ch);
     515 GIC     5311445 :                 appendStringInfoChar(s, ch);
     516                 :             }
     517          115690 :             appendStringInfoChar(s, '\'');
     518 CBC      115690 :             break;
     519                 :     }
     520 GIC      175907 : }
     521                 : 
     522                 : /* print the tuple 'tuple' into the StringInfo s */
     523 ECB             : static void
     524 GIC      145521 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     525                 : {
     526                 :     int         natt;
     527                 : 
     528                 :     /* print all columns individually */
     529          347185 :     for (natt = 0; natt < tupdesc->natts; natt++)
     530                 :     {
     531                 :         Form_pg_attribute attr; /* the attribute itself */
     532 ECB             :         Oid         typid;      /* type of current attribute */
     533                 :         Oid         typoutput;  /* output function */
     534                 :         bool        typisvarlena;
     535                 :         Datum       origval;    /* possibly toasted Datum */
     536                 :         bool        isnull;     /* column is null? */
     537                 : 
     538 CBC      201664 :         attr = TupleDescAttr(tupdesc, natt);
     539 ECB             : 
     540                 :         /*
     541                 :          * don't print dropped columns, we can't be sure everything is
     542                 :          * available for them
     543                 :          */
     544 GIC      201664 :         if (attr->attisdropped)
     545 CBC        5130 :             continue;
     546 EUB             : 
     547                 :         /*
     548 ECB             :          * Don't print system columns, oid will already have been printed if
     549                 :          * present.
     550                 :          */
     551 CBC      201592 :         if (attr->attnum < 0)
     552 UIC           0 :             continue;
     553 ECB             : 
     554 CBC      201592 :         typid = attr->atttypid;
     555                 : 
     556                 :         /* get Datum from tuple */
     557          201592 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     558 ECB             : 
     559 GIC      201592 :         if (isnull && skip_nulls)
     560            5058 :             continue;
     561 ECB             : 
     562                 :         /* print attribute name */
     563 CBC      196534 :         appendStringInfoChar(s, ' ');
     564 GIC      196534 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     565                 : 
     566 ECB             :         /* print attribute type */
     567 GIC      196534 :         appendStringInfoChar(s, '[');
     568          196534 :         appendStringInfoString(s, format_type_be(typid));
     569          196534 :         appendStringInfoChar(s, ']');
     570 ECB             : 
     571                 :         /* query output function */
     572 GIC      196534 :         getTypeOutputInfo(typid,
     573 ECB             :                           &typoutput, &typisvarlena);
     574                 : 
     575                 :         /* print separator */
     576 CBC      196534 :         appendStringInfoChar(s, ':');
     577 ECB             : 
     578                 :         /* print data */
     579 GIC      196534 :         if (isnull)
     580           20615 :             appendStringInfoString(s, "null");
     581          175919 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     582              12 :             appendStringInfoString(s, "unchanged-toast-datum");
     583          175907 :         else if (!typisvarlena)
     584 CBC       60221 :             print_literal(s, typid,
     585 ECB             :                           OidOutputFunctionCall(typoutput, origval));
     586                 :         else
     587                 :         {
     588                 :             Datum       val;    /* definitely detoasted Datum */
     589                 : 
     590 GIC      115686 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     591          115686 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     592                 :         }
     593                 :     }
     594 CBC      145521 : }
     595                 : 
     596                 : /*
     597                 :  * callback for individual changed tuples
     598                 :  */
     599                 : static void
     600 GIC      150510 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     601                 :                  Relation relation, ReorderBufferChange *change)
     602                 : {
     603 ECB             :     TestDecodingData *data;
     604                 :     TestDecodingTxnData *txndata;
     605                 :     Form_pg_class class_form;
     606                 :     TupleDesc   tupdesc;
     607                 :     MemoryContext old;
     608                 : 
     609 CBC      150510 :     data = ctx->output_plugin_private;
     610 GIC      150510 :     txndata = txn->output_plugin_private;
     611 ECB             : 
     612                 :     /* output BEGIN if we haven't yet */
     613 CBC      150510 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     614 ECB             :     {
     615 GIC         212 :         pg_output_begin(ctx, data, txn, false);
     616                 :     }
     617 CBC      150510 :     txndata->xact_wrote_changes = true;
     618                 : 
     619          150510 :     class_form = RelationGetForm(relation);
     620 GIC      150510 :     tupdesc = RelationGetDescr(relation);
     621 ECB             : 
     622                 :     /* Avoid leaking memory by using and resetting our own context */
     623 CBC      150510 :     old = MemoryContextSwitchTo(data->context);
     624 ECB             : 
     625 CBC      150510 :     OutputPluginPrepareWrite(ctx, true);
     626                 : 
     627          150510 :     appendStringInfoString(ctx->out, "table ");
     628 GIC      150510 :     appendStringInfoString(ctx->out,
     629 CBC      150510 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     630 GIC      150510 :                                                       class_form->relrewrite ?
     631 CBC           1 :                                                       get_rel_name(class_form->relrewrite) :
     632 ECB             :                                                       NameStr(class_form->relname)));
     633 CBC      150510 :     appendStringInfoChar(ctx->out, ':');
     634 EUB             : 
     635 GIC      150510 :     switch (change->action)
     636 ECB             :     {
     637 CBC      132945 :         case REORDER_BUFFER_CHANGE_INSERT:
     638 GIC      132945 :             appendStringInfoString(ctx->out, " INSERT:");
     639 CBC      132945 :             if (change->data.tp.newtuple == NULL)
     640 LBC           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     641 ECB             :             else
     642 CBC      132945 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     643 GIC      132945 :                                     &change->data.tp.newtuple->tuple,
     644 ECB             :                                     false);
     645 CBC      132945 :             break;
     646            7543 :         case REORDER_BUFFER_CHANGE_UPDATE:
     647 GIC        7543 :             appendStringInfoString(ctx->out, " UPDATE:");
     648 CBC        7543 :             if (change->data.tp.oldtuple != NULL)
     649                 :             {
     650 GIC          18 :                 appendStringInfoString(ctx->out, " old-key:");
     651 CBC          18 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     652 GBC          18 :                                     &change->data.tp.oldtuple->tuple,
     653                 :                                     true);
     654 CBC          18 :                 appendStringInfoString(ctx->out, " new-tuple:");
     655 ECB             :             }
     656                 : 
     657 CBC        7543 :             if (change->data.tp.newtuple == NULL)
     658 LBC           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     659 ECB             :             else
     660 GIC        7543 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     661            7543 :                                     &change->data.tp.newtuple->tuple,
     662 ECB             :                                     false);
     663 CBC        7543 :             break;
     664 GIC       10022 :         case REORDER_BUFFER_CHANGE_DELETE:
     665           10022 :             appendStringInfoString(ctx->out, " DELETE:");
     666 ECB             : 
     667                 :             /* if there was no PK, we only know that a delete happened */
     668 GIC       10022 :             if (change->data.tp.oldtuple == NULL)
     669 CBC        5007 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     670 EUB             :             /* In DELETE, only the replica identity is present; display that */
     671                 :             else
     672 GIC        5015 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     673            5015 :                                     &change->data.tp.oldtuple->tuple,
     674 ECB             :                                     true);
     675 CBC       10022 :             break;
     676 UIC           0 :         default:
     677 LBC           0 :             Assert(false);
     678 ECB             :     }
     679                 : 
     680 GIC      150510 :     MemoryContextSwitchTo(old);
     681 CBC      150510 :     MemoryContextReset(data->context);
     682                 : 
     683 GIC      150510 :     OutputPluginWrite(ctx, true);
     684          150510 : }
     685                 : 
     686                 : static void
     687               6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     688                 :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     689 ECB             : {
     690                 :     TestDecodingData *data;
     691                 :     TestDecodingTxnData *txndata;
     692                 :     MemoryContext old;
     693                 :     int         i;
     694                 : 
     695 CBC           6 :     data = ctx->output_plugin_private;
     696 GIC           6 :     txndata = txn->output_plugin_private;
     697 ECB             : 
     698                 :     /* output BEGIN if we haven't yet */
     699 GIC           6 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     700 ECB             :     {
     701 GIC           6 :         pg_output_begin(ctx, data, txn, false);
     702 ECB             :     }
     703 GIC           6 :     txndata->xact_wrote_changes = true;
     704 ECB             : 
     705                 :     /* Avoid leaking memory by using and resetting our own context */
     706 CBC           6 :     old = MemoryContextSwitchTo(data->context);
     707                 : 
     708               6 :     OutputPluginPrepareWrite(ctx, true);
     709 ECB             : 
     710 GIC           6 :     appendStringInfoString(ctx->out, "table ");
     711 ECB             : 
     712 CBC          13 :     for (i = 0; i < nrelations; i++)
     713 ECB             :     {
     714 GIC           7 :         if (i > 0)
     715               1 :             appendStringInfoString(ctx->out, ", ");
     716 ECB             : 
     717 GIC           7 :         appendStringInfoString(ctx->out,
     718 CBC           7 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     719               7 :                                                           NameStr(relations[i]->rd_rel->relname)));
     720                 :     }
     721 ECB             : 
     722 CBC           6 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     723 ECB             : 
     724 CBC           6 :     if (change->data.truncate.restart_seqs
     725 GIC           5 :         || change->data.truncate.cascade)
     726                 :     {
     727 CBC           1 :         if (change->data.truncate.restart_seqs)
     728 GIC           1 :             appendStringInfoString(ctx->out, " restart_seqs");
     729 CBC           1 :         if (change->data.truncate.cascade)
     730               1 :             appendStringInfoString(ctx->out, " cascade");
     731                 :     }
     732 ECB             :     else
     733 CBC           5 :         appendStringInfoString(ctx->out, " (no-flags)");
     734                 : 
     735 GIC           6 :     MemoryContextSwitchTo(old);
     736 CBC           6 :     MemoryContextReset(data->context);
     737                 : 
     738 GIC           6 :     OutputPluginWrite(ctx, true);
     739               6 : }
     740 ECB             : 
     741                 : static void
     742 GIC           8 : pg_decode_message(LogicalDecodingContext *ctx,
     743 ECB             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     744                 :                   const char *prefix, Size sz, const char *message)
     745                 : {
     746 GIC           8 :     OutputPluginPrepareWrite(ctx, true);
     747               8 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     748 ECB             :                      transactional, prefix, sz);
     749 GIC           8 :     appendBinaryStringInfo(ctx->out, message, sz);
     750               8 :     OutputPluginWrite(ctx, true);
     751 CBC           8 : }
     752 ECB             : 
     753                 : static void
     754 GIC          11 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     755                 :                        ReorderBufferTXN *txn)
     756                 : {
     757 CBC          11 :     TestDecodingData *data = ctx->output_plugin_private;
     758 GIC          11 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     759                 : 
     760 ECB             :     /*
     761                 :      * Allocate the txn plugin data for the first stream in the transaction.
     762                 :      */
     763 GIC          11 :     if (txndata == NULL)
     764                 :     {
     765 ECB             :         txndata =
     766 CBC           7 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     767               7 :         txndata->xact_wrote_changes = false;
     768 GBC           7 :         txn->output_plugin_private = txndata;
     769                 :     }
     770                 : 
     771 GIC          11 :     txndata->stream_wrote_changes = false;
     772 CBC          11 :     if (data->skip_empty_xacts)
     773 GIC          11 :         return;
     774 LBC           0 :     pg_output_stream_start(ctx, data, txn, true);
     775 ECB             : }
     776 EUB             : 
     777                 : static void
     778 CBC           6 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     779 ECB             : {
     780 CBC           6 :     OutputPluginPrepareWrite(ctx, last_write);
     781 GIC           6 :     if (data->include_xids)
     782 UIC           0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     783 ECB             :     else
     784 GIC           6 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     785               6 :     OutputPluginWrite(ctx, last_write);
     786 CBC           6 : }
     787 ECB             : 
     788                 : static void
     789 CBC          11 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     790 ECB             :                       ReorderBufferTXN *txn)
     791                 : {
     792 CBC          11 :     TestDecodingData *data = ctx->output_plugin_private;
     793              11 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     794 EUB             : 
     795 GIC          11 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     796 CBC           5 :         return;
     797 ECB             : 
     798 GIC           6 :     OutputPluginPrepareWrite(ctx, true);
     799               6 :     if (data->include_xids)
     800 UIC           0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     801 ECB             :     else
     802 GIC           6 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     803               6 :     OutputPluginWrite(ctx, true);
     804                 : }
     805 ECB             : 
     806                 : static void
     807 GIC           3 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     808                 :                        ReorderBufferTXN *txn,
     809                 :                        XLogRecPtr abort_lsn)
     810                 : {
     811               3 :     TestDecodingData *data = ctx->output_plugin_private;
     812 ECB             : 
     813                 :     /*
     814                 :      * stream abort can be sent for an individual subtransaction but we
     815                 :      * maintain the output_plugin_private only under the toptxn so if this is
     816                 :      * not the toptxn then fetch the toptxn.
     817                 :      */
     818 GNC           3 :     ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
     819 GBC           3 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     820               3 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     821                 : 
     822 GNC           3 :     if (rbtxn_is_toptxn(txn))
     823 ECB             :     {
     824 LBC           0 :         Assert(txn->output_plugin_private != NULL);
     825 UIC           0 :         pfree(txndata);
     826 UBC           0 :         txn->output_plugin_private = NULL;
     827 EUB             :     }
     828                 : 
     829 GIC           3 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     830 GBC           3 :         return;
     831 EUB             : 
     832 UIC           0 :     OutputPluginPrepareWrite(ctx, true);
     833               0 :     if (data->include_xids)
     834               0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     835 ECB             :     else
     836 UIC           0 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     837               0 :     OutputPluginWrite(ctx, true);
     838                 : }
     839 ECB             : 
     840                 : static void
     841 GIC           1 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     842 ECB             :                          ReorderBufferTXN *txn,
     843 EUB             :                          XLogRecPtr prepare_lsn)
     844                 : {
     845 CBC           1 :     TestDecodingData *data = ctx->output_plugin_private;
     846 GIC           1 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     847 ECB             : 
     848 GBC           1 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     849 UBC           0 :         return;
     850                 : 
     851 CBC           1 :     OutputPluginPrepareWrite(ctx, true);
     852 ECB             : 
     853 GIC           1 :     if (data->include_xids)
     854 LBC           0 :         appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
     855 UBC           0 :                          quote_literal_cstr(txn->gid), txn->xid);
     856                 :     else
     857 GIC           1 :         appendStringInfo(ctx->out, "preparing streamed transaction %s",
     858 CBC           1 :                          quote_literal_cstr(txn->gid));
     859                 : 
     860 GIC           1 :     if (data->include_timestamp)
     861 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
     862 ECB             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     863                 : 
     864 GIC           1 :     OutputPluginWrite(ctx, true);
     865                 : }
     866 ECB             : 
     867                 : static void
     868 CBC           4 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     869                 :                         ReorderBufferTXN *txn,
     870 ECB             :                         XLogRecPtr commit_lsn)
     871                 : {
     872 GIC           4 :     TestDecodingData *data = ctx->output_plugin_private;
     873 CBC           4 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     874 GBC           4 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     875                 : 
     876 CBC           4 :     pfree(txndata);
     877 GIC           4 :     txn->output_plugin_private = NULL;
     878 ECB             : 
     879 GBC           4 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     880 UIC           0 :         return;
     881 ECB             : 
     882 GIC           4 :     OutputPluginPrepareWrite(ctx, true);
     883 ECB             : 
     884 GBC           4 :     if (data->include_xids)
     885 UIC           0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     886                 :     else
     887 CBC           4 :         appendStringInfoString(ctx->out, "committing streamed transaction");
     888                 : 
     889 GIC           4 :     if (data->include_timestamp)
     890 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
     891                 :                          timestamptz_to_str(txn->xact_time.commit_time));
     892                 : 
     893 GIC           4 :     OutputPluginWrite(ctx, true);
     894                 : }
     895                 : 
     896 ECB             : /*
     897                 :  * In streaming mode, we don't display the changes as the transaction can abort
     898                 :  * at a later point in time.  We don't want users to see the changes until the
     899                 :  * transaction is committed.
     900                 :  */
     901                 : static void
     902 CBC          63 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     903                 :                         ReorderBufferTXN *txn,
     904                 :                         Relation relation,
     905 ECB             :                         ReorderBufferChange *change)
     906                 : {
     907 CBC          63 :     TestDecodingData *data = ctx->output_plugin_private;
     908 GIC          63 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     909 ECB             : 
     910                 :     /* output stream start if we haven't yet */
     911 CBC          63 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     912 ECB             :     {
     913 GBC           6 :         pg_output_stream_start(ctx, data, txn, false);
     914                 :     }
     915 CBC          63 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     916 ECB             : 
     917 CBC          63 :     OutputPluginPrepareWrite(ctx, true);
     918 GIC          63 :     if (data->include_xids)
     919 UIC           0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     920                 :     else
     921 GIC          63 :         appendStringInfoString(ctx->out, "streaming change for transaction");
     922              63 :     OutputPluginWrite(ctx, true);
     923              63 : }
     924                 : 
     925 ECB             : /*
     926                 :  * In streaming mode, we don't display the contents for transactional messages
     927                 :  * as the transaction can abort at a later point in time.  We don't want users to
     928                 :  * see the message contents until the transaction is committed.
     929                 :  */
     930                 : static void
     931 CBC           3 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     932                 :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     933 ECB             :                          const char *prefix, Size sz, const char *message)
     934                 : {
     935 GIC           3 :     OutputPluginPrepareWrite(ctx, true);
     936                 : 
     937               3 :     if (transactional)
     938 EUB             :     {
     939 GIC           3 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     940 EUB             :                          transactional, prefix, sz);
     941                 :     }
     942                 :     else
     943 ECB             :     {
     944 LBC           0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     945                 :                          transactional, prefix, sz);
     946 UIC           0 :         appendBinaryStringInfo(ctx->out, message, sz);
     947                 :     }
     948                 : 
     949 GIC           3 :     OutputPluginWrite(ctx, true);
     950               3 : }
     951 EUB             : 
     952                 : /*
     953                 :  * In streaming mode, we don't display the detailed information of Truncate.
     954                 :  * See pg_decode_stream_change.
     955                 :  */
     956                 : static void
     957 UIC           0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     958 EUB             :                           int nrelations, Relation relations[],
     959                 :                           ReorderBufferChange *change)
     960                 : {
     961 UIC           0 :     TestDecodingData *data = ctx->output_plugin_private;
     962 UBC           0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     963                 : 
     964               0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     965 EUB             :     {
     966 UBC           0 :         pg_output_stream_start(ctx, data, txn, false);
     967                 :     }
     968               0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     969 EUB             : 
     970 UBC           0 :     OutputPluginPrepareWrite(ctx, true);
     971 UIC           0 :     if (data->include_xids)
     972               0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
     973                 :     else
     974               0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
     975               0 :     OutputPluginWrite(ctx, true);
     976               0 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a