LCOV - differential code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 83.4 % 403 336 14 43 10 22 171 2 141 34 179 1 2
Current Date: 2023-04-08 17:13:01 Functions: 96.4 % 28 27 1 26 1 1 26
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 2 2 2
Legend: Lines: hit not hit (240..) days: 83.3 % 401 334 14 43 10 22 171 141 34 179
Function coverage date bins:
(240..) days: 49.1 % 55 27 1 26 1 1 26

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * test_decoding.c
                                  4                 :  *        example logical decoding output plugin
                                  5                 :  *
                                  6                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *        contrib/test_decoding/test_decoding.c
                                 10                 :  *
                                 11                 :  *-------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : #include "postgres.h"
                                 14                 : 
                                 15                 : #include "catalog/pg_type.h"
                                 16                 : 
                                 17                 : #include "replication/logical.h"
                                 18                 : #include "replication/origin.h"
                                 19                 : 
                                 20                 : #include "utils/builtins.h"
                                 21                 : #include "utils/lsyscache.h"
                                 22                 : #include "utils/memutils.h"
                                 23                 : #include "utils/rel.h"
                                 24                 : 
 3324 rhaas                      25 CBC          92 : PG_MODULE_MAGIC;
                                 26                 : 
                                 27                 : typedef struct
                                 28                 : {
                                 29                 :     MemoryContext context;
                                 30                 :     bool        include_xids;
                                 31                 :     bool        include_timestamp;
                                 32                 :     bool        skip_empty_xacts;
                                 33                 :     bool        only_local;
                                 34                 : } TestDecodingData;
                                 35                 : 
                                 36                 : /*
                                 37                 :  * Maintain the per-transaction level variables to track whether the
                                 38                 :  * transaction and or streams have written any changes. In streaming mode the
                                 39                 :  * transaction can be decoded in streams so along with maintaining whether the
                                 40                 :  * transaction has written any changes, we also need to track whether the
                                 41                 :  * current stream has written any changes. This is required so that if user
                                 42                 :  * has requested to skip the empty transactions we can skip the empty streams
                                 43                 :  * even though the transaction has written some changes.
                                 44                 :  */
                                 45                 : typedef struct
                                 46                 : {
                                 47                 :     bool        xact_wrote_changes;
                                 48                 :     bool        stream_wrote_changes;
                                 49                 : } TestDecodingTxnData;
                                 50                 : 
                                 51                 : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                 52                 :                               bool is_init);
                                 53                 : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
                                 54                 : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
                                 55                 :                                 ReorderBufferTXN *txn);
                                 56                 : static void pg_output_begin(LogicalDecodingContext *ctx,
                                 57                 :                             TestDecodingData *data,
                                 58                 :                             ReorderBufferTXN *txn,
                                 59                 :                             bool last_write);
                                 60                 : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
                                 61                 :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
                                 62                 : static void pg_decode_change(LogicalDecodingContext *ctx,
                                 63                 :                              ReorderBufferTXN *txn, Relation relation,
                                 64                 :                              ReorderBufferChange *change);
                                 65                 : static void pg_decode_truncate(LogicalDecodingContext *ctx,
                                 66                 :                                ReorderBufferTXN *txn,
                                 67                 :                                int nrelations, Relation relations[],
                                 68                 :                                ReorderBufferChange *change);
                                 69                 : static bool pg_decode_filter(LogicalDecodingContext *ctx,
                                 70                 :                              RepOriginId origin_id);
                                 71                 : static void pg_decode_message(LogicalDecodingContext *ctx,
                                 72                 :                               ReorderBufferTXN *txn, XLogRecPtr lsn,
                                 73                 :                               bool transactional, const char *prefix,
                                 74                 :                               Size sz, const char *message);
                                 75                 : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
                                 76                 :                                      TransactionId xid,
                                 77                 :                                      const char *gid);
                                 78                 : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
                                 79                 :                                         ReorderBufferTXN *txn);
                                 80                 : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
                                 81                 :                                   ReorderBufferTXN *txn,
                                 82                 :                                   XLogRecPtr prepare_lsn);
                                 83                 : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
                                 84                 :                                           ReorderBufferTXN *txn,
                                 85                 :                                           XLogRecPtr commit_lsn);
                                 86                 : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
                                 87                 :                                             ReorderBufferTXN *txn,
                                 88                 :                                             XLogRecPtr prepare_end_lsn,
                                 89                 :                                             TimestampTz prepare_time);
                                 90                 : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
                                 91                 :                                    ReorderBufferTXN *txn);
                                 92                 : static void pg_output_stream_start(LogicalDecodingContext *ctx,
                                 93                 :                                    TestDecodingData *data,
                                 94                 :                                    ReorderBufferTXN *txn,
                                 95                 :                                    bool last_write);
                                 96                 : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
                                 97                 :                                   ReorderBufferTXN *txn);
                                 98                 : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
                                 99                 :                                    ReorderBufferTXN *txn,
                                100                 :                                    XLogRecPtr abort_lsn);
                                101                 : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
                                102                 :                                      ReorderBufferTXN *txn,
                                103                 :                                      XLogRecPtr prepare_lsn);
                                104                 : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
                                105                 :                                     ReorderBufferTXN *txn,
                                106                 :                                     XLogRecPtr commit_lsn);
                                107                 : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
                                108                 :                                     ReorderBufferTXN *txn,
                                109                 :                                     Relation relation,
                                110                 :                                     ReorderBufferChange *change);
                                111                 : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
                                112                 :                                      ReorderBufferTXN *txn, XLogRecPtr lsn,
                                113                 :                                      bool transactional, const char *prefix,
                                114                 :                                      Size sz, const char *message);
                                115                 : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
                                116                 :                                       ReorderBufferTXN *txn,
  985 akapila                   117 ECB             :                                       int nrelations, Relation relations[],
                                118                 :                                       ReorderBufferChange *change);
                                119                 : 
 3324 rhaas                     120                 : void
 3324 rhaas                     121 GIC          92 : _PG_init(void)
                                122                 : {
                                123                 :     /* other plugins can perform things here */
 3324 rhaas                     124 CBC          92 : }
                                125                 : 
 3324 rhaas                     126 ECB             : /* specify output plugin callbacks */
                                127                 : void
 3324 rhaas                     128 CBC         304 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
 3324 rhaas                     129 ECB             : {
 3324 rhaas                     130 CBC         304 :     cb->startup_cb = pg_decode_startup;
                                131             304 :     cb->begin_cb = pg_decode_begin_txn;
                                132             304 :     cb->change_cb = pg_decode_change;
 1828 peter_e                   133             304 :     cb->truncate_cb = pg_decode_truncate;
 3324 rhaas                     134             304 :     cb->commit_cb = pg_decode_commit_txn;
 2902 andres                    135             304 :     cb->filter_by_origin_cb = pg_decode_filter;
 3324 rhaas                     136             304 :     cb->shutdown_cb = pg_decode_shutdown;
 2559 simon                     137             304 :     cb->message_cb = pg_decode_message;
  830 akapila                   138             304 :     cb->filter_prepare_cb = pg_decode_filter_prepare;
                                139             304 :     cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
                                140             304 :     cb->prepare_cb = pg_decode_prepare_txn;
                                141             304 :     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
                                142             304 :     cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
  985                           143             304 :     cb->stream_start_cb = pg_decode_stream_start;
                                144             304 :     cb->stream_stop_cb = pg_decode_stream_stop;
                                145             304 :     cb->stream_abort_cb = pg_decode_stream_abort;
  830 akapila                   146 GIC         304 :     cb->stream_prepare_cb = pg_decode_stream_prepare;
  985                           147             304 :     cb->stream_commit_cb = pg_decode_stream_commit;
                                148             304 :     cb->stream_change_cb = pg_decode_stream_change;
                                149             304 :     cb->stream_message_cb = pg_decode_stream_message;
  985 akapila                   150 CBC         304 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
 3324 rhaas                     151 GIC         304 : }
                                152                 : 
                                153                 : 
                                154                 : /* initialize this plugin */
 3324 rhaas                     155 ECB             : static void
 3324 rhaas                     156 GIC         304 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 3324 rhaas                     157 ECB             :                   bool is_init)
                                158                 : {
                                159                 :     ListCell   *option;
                                160                 :     TestDecodingData *data;
  974 akapila                   161 CBC         304 :     bool        enable_streaming = false;
 3324 rhaas                     162 ECB             : 
 3142 andres                    163 CBC         304 :     data = palloc0(sizeof(TestDecodingData));
 3324 rhaas                     164             304 :     data->context = AllocSetContextCreate(ctx->context,
                                165                 :                                           "text conversion context",
 2416 tgl                       166 ECB             :                                           ALLOCSET_DEFAULT_SIZES);
 3324 rhaas                     167 GIC         304 :     data->include_xids = true;
 3324 rhaas                     168 CBC         304 :     data->include_timestamp = false;
 3142 andres                    169             304 :     data->skip_empty_xacts = false;
 2902 andres                    170 GIC         304 :     data->only_local = false;
 3324 rhaas                     171 ECB             : 
 3324 rhaas                     172 GIC         304 :     ctx->output_plugin_private = data;
 3324 rhaas                     173 ECB             : 
 3324 rhaas                     174 GIC         304 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 1845 peter_e                   175 CBC         304 :     opt->receive_rewrites = false;
                                176                 : 
 3324 rhaas                     177             641 :     foreach(option, ctx->output_plugin_options)
                                178                 :     {
 3324 rhaas                     179 GIC         340 :         DefElem    *elem = lfirst(option);
 3324 rhaas                     180 ECB             : 
 3324 rhaas                     181 GBC         340 :         Assert(elem->arg == NULL || IsA(elem->arg, String));
 3324 rhaas                     182 ECB             : 
 3324 rhaas                     183 CBC         340 :         if (strcmp(elem->defname, "include-xids") == 0)
                                184                 :         {
                                185                 :             /* if option does not provide a value, it means its value is true */
 3324 rhaas                     186 GIC         161 :             if (elem->arg == NULL)
 3324 rhaas                     187 UIC           0 :                 data->include_xids = true;
 3324 rhaas                     188 CBC         161 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
 3324 rhaas                     189 GIC           2 :                 ereport(ERROR,
 3324 rhaas                     190 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 2118 tgl                       191 EUB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
 2118 tgl                       192 ECB             :                                 strVal(elem->arg), elem->defname)));
 3324 rhaas                     193 EUB             :         }
 3324 rhaas                     194 GIC         179 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
                                195                 :         {
                                196               1 :             if (elem->arg == NULL)
 3324 rhaas                     197 UIC           0 :                 data->include_timestamp = true;
 3324 rhaas                     198 CBC           1 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
 3324 rhaas                     199 UIC           0 :                 ereport(ERROR,
                                200                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                201                 :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
 2118 tgl                       202 ECB             :                                 strVal(elem->arg), elem->defname)));
 3324 rhaas                     203 EUB             :         }
 3324 rhaas                     204 CBC         178 :         else if (strcmp(elem->defname, "force-binary") == 0)
 3324 rhaas                     205 EUB             :         {
                                206                 :             bool        force_binary;
                                207                 : 
 3324 rhaas                     208 GIC           6 :             if (elem->arg == NULL)
 3324 rhaas                     209 UIC           0 :                 continue;
 3324 rhaas                     210 CBC           6 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
 3324 rhaas                     211 LBC           0 :                 ereport(ERROR,
                                212                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 2118 tgl                       213 ECB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                214                 :                                 strVal(elem->arg), elem->defname)));
                                215                 : 
 3324 rhaas                     216 CBC           6 :             if (force_binary)
 3324 rhaas                     217 GBC           2 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
 3324 rhaas                     218 ECB             :         }
 3142 andres                    219 GBC         172 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
                                220                 :         {
                                221                 : 
 3142 andres                    222 GIC         159 :             if (elem->arg == NULL)
 3142 andres                    223 UIC           0 :                 data->skip_empty_xacts = true;
 3142 andres                    224 CBC         159 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
 3142 andres                    225 UIC           0 :                 ereport(ERROR,
                                226                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 2118 tgl                       227 ECB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
 2118 tgl                       228 EUB             :                                 strVal(elem->arg), elem->defname)));
 3142 andres                    229 ECB             :         }
 2902 andres                    230 GBC          13 :         else if (strcmp(elem->defname, "only-local") == 0)
                                231                 :         {
                                232                 : 
 2902 andres                    233 GIC           3 :             if (elem->arg == NULL)
 2902 andres                    234 UIC           0 :                 data->only_local = true;
 2902 andres                    235 CBC           3 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
 2902 andres                    236 UIC           0 :                 ereport(ERROR,
                                237                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 2118 tgl                       238 ECB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
 2118 tgl                       239 EUB             :                                 strVal(elem->arg), elem->defname)));
 2902 andres                    240 ECB             :         }
 1845 peter_e                   241 GBC          10 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
                                242                 :         {
                                243                 : 
 1845 peter_e                   244 GIC           1 :             if (elem->arg == NULL)
 1845 peter_e                   245 UIC           0 :                 continue;
 1845 peter_e                   246 CBC           1 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
 1845 peter_e                   247 UIC           0 :                 ereport(ERROR,
 1845 peter_e                   248 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 1845 peter_e                   249 EUB             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
 1845 peter_e                   250 ECB             :                                 strVal(elem->arg), elem->defname)));
 1845 peter_e                   251 EUB             :         }
  974 akapila                   252 GIC           9 :         else if (strcmp(elem->defname, "stream-changes") == 0)
                                253                 :         {
                                254               8 :             if (elem->arg == NULL)
  974 akapila                   255 UIC           0 :                 continue;
  974 akapila                   256 GIC           8 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
  974 akapila                   257 UIC           0 :                 ereport(ERROR,
  974 akapila                   258 ECB             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                259                 :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                260                 :                                 strVal(elem->arg), elem->defname)));
                                261                 :         }
                                262                 :         else
                                263                 :         {
 3324 rhaas                     264 GIC           1 :             ereport(ERROR,
                                265                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 3324 rhaas                     266 ECB             :                      errmsg("option \"%s\" = \"%s\" is unknown",
                                267                 :                             elem->defname,
                                268                 :                             elem->arg ? strVal(elem->arg) : "(null)")));
                                269                 :         }
                                270                 :     }
  974 akapila                   271                 : 
  974 akapila                   272 GIC         301 :     ctx->streaming &= enable_streaming;
 3324 rhaas                     273 CBC         301 : }
                                274                 : 
                                275                 : /* cleanup this plugin's resources */
 3324 rhaas                     276 ECB             : static void
 3324 rhaas                     277 CBC         290 : pg_decode_shutdown(LogicalDecodingContext *ctx)
                                278                 : {
 3324 rhaas                     279 GIC         290 :     TestDecodingData *data = ctx->output_plugin_private;
                                280                 : 
 3324 rhaas                     281 ECB             :     /* cleanup our own resources via memory context reset */
 3324 rhaas                     282 GIC         290 :     MemoryContextDelete(data->context);
 3324 rhaas                     283 CBC         290 : }
                                284                 : 
 3324 rhaas                     285 ECB             : /* BEGIN callback */
                                286                 : static void
 3324 rhaas                     287 CBC         415 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 3324 rhaas                     288 ECB             : {
 3324 rhaas                     289 GIC         415 :     TestDecodingData *data = ctx->output_plugin_private;
                                290                 :     TestDecodingTxnData *txndata =
  873 akapila                   291             415 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
                                292                 : 
                                293             415 :     txndata->xact_wrote_changes = false;
  873 akapila                   294 CBC         415 :     txn->output_plugin_private = txndata;
 3324 rhaas                     295 ECB             : 
                                296                 :     /*
  332 tgl                       297                 :      * If asked to skip empty transactions, we'll emit BEGIN at the point
                                298                 :      * where the first operation is received for this transaction.
                                299                 :      */
 3142 andres                    300 GIC         415 :     if (data->skip_empty_xacts)
 3142 andres                    301 CBC         385 :         return;
                                302                 : 
                                303              30 :     pg_output_begin(ctx, data, txn, true);
 3142 andres                    304 ECB             : }
                                305                 : 
                                306                 : static void
 3142 andres                    307 CBC         248 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 3142 andres                    308 ECB             : {
 3142 andres                    309 CBC         248 :     OutputPluginPrepareWrite(ctx, last_write);
 3324 rhaas                     310 GIC         248 :     if (data->include_xids)
                                311              25 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
                                312                 :     else
 3324 rhaas                     313 CBC         223 :         appendStringInfoString(ctx->out, "BEGIN");
 3142 andres                    314 GIC         248 :     OutputPluginWrite(ctx, last_write);
 3324 rhaas                     315             248 : }
 3324 rhaas                     316 ECB             : 
                                317                 : /* COMMIT callback */
                                318                 : static void
 3324 rhaas                     319 GIC         415 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 3324 rhaas                     320 ECB             :                      XLogRecPtr commit_lsn)
                                321                 : {
 3324 rhaas                     322 GIC         415 :     TestDecodingData *data = ctx->output_plugin_private;
  873 akapila                   323 CBC         415 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
                                324             415 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
                                325                 : 
                                326             415 :     pfree(txndata);
                                327             415 :     txn->output_plugin_private = NULL;
 3324 rhaas                     328 ECB             : 
  873 akapila                   329 GIC         415 :     if (data->skip_empty_xacts && !xact_wrote_changes)
 3142 andres                    330 CBC         173 :         return;
                                331                 : 
 3324 rhaas                     332             242 :     OutputPluginPrepareWrite(ctx, true);
                                333             242 :     if (data->include_xids)
 3324 rhaas                     334 GIC          25 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
                                335                 :     else
 3324 rhaas                     336 CBC         217 :         appendStringInfoString(ctx->out, "COMMIT");
                                337                 : 
 3324 rhaas                     338 GIC         242 :     if (data->include_timestamp)
                                339               1 :         appendStringInfo(ctx->out, " (at %s)",
                                340                 :                          timestamptz_to_str(txn->xact_time.commit_time));
 3324 rhaas                     341 ECB             : 
 3324 rhaas                     342 GIC         242 :     OutputPluginWrite(ctx, true);
 3324 rhaas                     343 ECB             : }
                                344                 : 
  830 akapila                   345                 : /* BEGIN PREPARE callback */
                                346                 : static void
  830 akapila                   347 CBC           6 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
  830 akapila                   348 ECB             : {
  830 akapila                   349 GIC           6 :     TestDecodingData *data = ctx->output_plugin_private;
                                350                 :     TestDecodingTxnData *txndata =
                                351               6 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
                                352                 : 
                                353               6 :     txndata->xact_wrote_changes = false;
  830 akapila                   354 CBC           6 :     txn->output_plugin_private = txndata;
  830 akapila                   355 ECB             : 
                                356                 :     /*
  332 tgl                       357 EUB             :      * If asked to skip empty transactions, we'll emit BEGIN at the point
                                358                 :      * where the first operation is received for this transaction.
                                359                 :      */
  830 akapila                   360 GIC           6 :     if (data->skip_empty_xacts)
                                361               6 :         return;
  830 akapila                   362 ECB             : 
  830 akapila                   363 UIC           0 :     pg_output_begin(ctx, data, txn, true);
                                364                 : }
  830 akapila                   365 ECB             : 
                                366                 : /* PREPARE callback */
                                367                 : static void
  830 akapila                   368 GIC           6 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                369                 :                       XLogRecPtr prepare_lsn)
                                370                 : {
                                371               6 :     TestDecodingData *data = ctx->output_plugin_private;
  830 akapila                   372 CBC           6 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
  830 akapila                   373 EUB             : 
                                374                 :     /*
  382 alvherre                  375 ECB             :      * If asked to skip empty transactions, we'll emit PREPARE at the point
                                376                 :      * where the first operation is received for this transaction.
                                377                 :      */
  830 akapila                   378 CBC           6 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
  830 akapila                   379 UIC           0 :         return;
  830 akapila                   380 ECB             : 
  830 akapila                   381 GBC           6 :     OutputPluginPrepareWrite(ctx, true);
                                382                 : 
  830 akapila                   383 CBC           6 :     appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
  830 akapila                   384 GBC           6 :                      quote_literal_cstr(txn->gid));
                                385                 : 
  830 akapila                   386 GIC           6 :     if (data->include_xids)
  830 akapila                   387 LBC           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
                                388                 : 
  830 akapila                   389 GIC           6 :     if (data->include_timestamp)
  830 akapila                   390 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
                                391                 :                          timestamptz_to_str(txn->xact_time.prepare_time));
  830 akapila                   392 ECB             : 
  830 akapila                   393 GIC           6 :     OutputPluginWrite(ctx, true);
                                394                 : }
  830 akapila                   395 ECB             : 
                                396                 : /* COMMIT PREPARED callback */
                                397                 : static void
  830 akapila                   398 GIC           6 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  830 akapila                   399 ECB             :                               XLogRecPtr commit_lsn)
                                400                 : {
  830 akapila                   401 GIC           6 :     TestDecodingData *data = ctx->output_plugin_private;
  830 akapila                   402 ECB             : 
  830 akapila                   403 GBC           6 :     OutputPluginPrepareWrite(ctx, true);
                                404                 : 
  830 akapila                   405 CBC           6 :     appendStringInfo(ctx->out, "COMMIT PREPARED %s",
  830 akapila                   406 GBC           6 :                      quote_literal_cstr(txn->gid));
                                407                 : 
  830 akapila                   408 GIC           6 :     if (data->include_xids)
  830 akapila                   409 LBC           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
  830 akapila                   410 ECB             : 
  830 akapila                   411 GIC           6 :     if (data->include_timestamp)
  830 akapila                   412 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
                                413                 :                          timestamptz_to_str(txn->xact_time.commit_time));
  830 akapila                   414 ECB             : 
  830 akapila                   415 GIC           6 :     OutputPluginWrite(ctx, true);
                                416               6 : }
                                417                 : 
                                418                 : /* ROLLBACK PREPARED callback */
  830 akapila                   419 ECB             : static void
  830 akapila                   420 GIC           1 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  830 akapila                   421 ECB             :                                 ReorderBufferTXN *txn,
                                422                 :                                 XLogRecPtr prepare_end_lsn,
                                423                 :                                 TimestampTz prepare_time)
                                424                 : {
  830 akapila                   425 GIC           1 :     TestDecodingData *data = ctx->output_plugin_private;
  830 akapila                   426 ECB             : 
  830 akapila                   427 GBC           1 :     OutputPluginPrepareWrite(ctx, true);
                                428                 : 
  830 akapila                   429 CBC           1 :     appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
  830 akapila                   430 GBC           1 :                      quote_literal_cstr(txn->gid));
                                431                 : 
  830 akapila                   432 GIC           1 :     if (data->include_xids)
  830 akapila                   433 LBC           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
  830 akapila                   434 ECB             : 
  830 akapila                   435 GIC           1 :     if (data->include_timestamp)
  830 akapila                   436 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
                                437                 :                          timestamptz_to_str(txn->xact_time.commit_time));
                                438                 : 
  830 akapila                   439 GIC           1 :     OutputPluginWrite(ctx, true);
                                440               1 : }
                                441                 : 
                                442                 : /*
                                443                 :  * Filter out two-phase transactions.
  830 akapila                   444 ECB             :  *
                                445                 :  * Each plugin can implement its own filtering logic. Here we demonstrate a
                                446                 :  * simple logic by checking the GID. If the GID contains the "_nodecode"
                                447                 :  * substring, then we filter it out.
                                448                 :  */
                                449                 : static bool
  740 akapila                   450 CBC         115 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
                                451                 :                          const char *gid)
                                452                 : {
  830 akapila                   453 GIC         115 :     if (strstr(gid, "_nodecode") != NULL)
  830 akapila                   454 CBC           8 :         return true;
                                455                 : 
  830 akapila                   456 GIC         107 :     return false;
  830 akapila                   457 ECB             : }
                                458                 : 
 2902 andres                    459                 : static bool
 2902 andres                    460 CBC     1194875 : pg_decode_filter(LogicalDecodingContext *ctx,
 2902 andres                    461 ECB             :                  RepOriginId origin_id)
                                462                 : {
 2902 andres                    463 GIC     1194875 :     TestDecodingData *data = ctx->output_plugin_private;
                                464                 : 
                                465         1194875 :     if (data->only_local && origin_id != InvalidRepOriginId)
                                466               9 :         return true;
                                467         1194866 :     return false;
                                468                 : }
                                469                 : 
                                470                 : /*
                                471                 :  * Print literal `outputstr' already represented as string of type `typid'
 3324 rhaas                     472 ECB             :  * into stringbuf `s'.
                                473                 :  *
                                474                 :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
                                475                 :  * if standard_conforming_strings were enabled.
                                476                 :  */
                                477                 : static void
 3324 rhaas                     478 CBC      175907 : print_literal(StringInfo s, Oid typid, char *outputstr)
                                479                 : {
                                480                 :     const char *valptr;
                                481                 : 
 3324 rhaas                     482 GIC      175907 :     switch (typid)
                                483                 :     {
                                484           60217 :         case INT2OID:
                                485                 :         case INT4OID:
 3324 rhaas                     486 ECB             :         case INT8OID:
                                487                 :         case OIDOID:
                                488                 :         case FLOAT4OID:
 3324 rhaas                     489 EUB             :         case FLOAT8OID:
                                490                 :         case NUMERICOID:
                                491                 :             /* NB: We don't care about Inf, NaN et al. */
 3324 rhaas                     492 GBC       60217 :             appendStringInfoString(s, outputstr);
 3324 rhaas                     493 GIC       60217 :             break;
 3324 rhaas                     494 EUB             : 
 3324 rhaas                     495 UBC           0 :         case BITOID:
 3324 rhaas                     496 EUB             :         case VARBITOID:
 3324 rhaas                     497 UIC           0 :             appendStringInfo(s, "B'%s'", outputstr);
 3324 rhaas                     498 UBC           0 :             break;
 3324 rhaas                     499 EUB             : 
 3324 rhaas                     500 UIC           0 :         case BOOLOID:
 3324 rhaas                     501 LBC           0 :             if (strcmp(outputstr, "t") == 0)
                                502               0 :                 appendStringInfoString(s, "true");
 3324 rhaas                     503 ECB             :             else
 3324 rhaas                     504 UIC           0 :                 appendStringInfoString(s, "false");
 3324 rhaas                     505 LBC           0 :             break;
                                506                 : 
 3324 rhaas                     507 CBC      115690 :         default:
                                508          115690 :             appendStringInfoChar(s, '\'');
                                509         5427135 :             for (valptr = outputstr; *valptr; valptr++)
                                510                 :             {
                                511         5311445 :                 char        ch = *valptr;
 3324 rhaas                     512 ECB             : 
 3324 rhaas                     513 GIC     5311445 :                 if (SQL_STR_DOUBLE(ch, false))
 3324 rhaas                     514 CBC          64 :                     appendStringInfoChar(s, ch);
 3324 rhaas                     515 GIC     5311445 :                 appendStringInfoChar(s, ch);
                                516                 :             }
                                517          115690 :             appendStringInfoChar(s, '\'');
 3324 rhaas                     518 CBC      115690 :             break;
                                519                 :     }
 3324 rhaas                     520 GIC      175907 : }
                                521                 : 
                                522                 : /* print the tuple 'tuple' into the StringInfo s */
 3324 rhaas                     523 ECB             : static void
 3324 rhaas                     524 GIC      145521 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
                                525                 : {
                                526                 :     int         natt;
                                527                 : 
                                528                 :     /* print all columns individually */
                                529          347185 :     for (natt = 0; natt < tupdesc->natts; natt++)
                                530                 :     {
                                531                 :         Form_pg_attribute attr; /* the attribute itself */
 3324 rhaas                     532 ECB             :         Oid         typid;      /* type of current attribute */
                                533                 :         Oid         typoutput;  /* output function */
                                534                 :         bool        typisvarlena;
                                535                 :         Datum       origval;    /* possibly toasted Datum */
                                536                 :         bool        isnull;     /* column is null? */
                                537                 : 
 2058 andres                    538 CBC      201664 :         attr = TupleDescAttr(tupdesc, natt);
 3324 rhaas                     539 ECB             : 
                                540                 :         /*
                                541                 :          * don't print dropped columns, we can't be sure everything is
                                542                 :          * available for them
                                543                 :          */
 3324 rhaas                     544 GIC      201664 :         if (attr->attisdropped)
 3324 rhaas                     545 CBC        5130 :             continue;
 3324 rhaas                     546 EUB             : 
                                547                 :         /*
 3324 rhaas                     548 ECB             :          * Don't print system columns, oid will already have been printed if
                                549                 :          * present.
                                550                 :          */
 3324 rhaas                     551 CBC      201592 :         if (attr->attnum < 0)
 3324 rhaas                     552 UIC           0 :             continue;
 3324 rhaas                     553 ECB             : 
 3324 rhaas                     554 CBC      201592 :         typid = attr->atttypid;
                                555                 : 
                                556                 :         /* get Datum from tuple */
 2843 andres                    557          201592 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
 3324 rhaas                     558 ECB             : 
 3324 rhaas                     559 GIC      201592 :         if (isnull && skip_nulls)
                                560            5058 :             continue;
 3324 rhaas                     561 ECB             : 
                                562                 :         /* print attribute name */
 3324 rhaas                     563 CBC      196534 :         appendStringInfoChar(s, ' ');
 3324 rhaas                     564 GIC      196534 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
                                565                 : 
 3324 rhaas                     566 ECB             :         /* print attribute type */
 3324 rhaas                     567 GIC      196534 :         appendStringInfoChar(s, '[');
                                568          196534 :         appendStringInfoString(s, format_type_be(typid));
                                569          196534 :         appendStringInfoChar(s, ']');
 3324 rhaas                     570 ECB             : 
                                571                 :         /* query output function */
 3324 rhaas                     572 GIC      196534 :         getTypeOutputInfo(typid,
 3324 rhaas                     573 ECB             :                           &typoutput, &typisvarlena);
                                574                 : 
                                575                 :         /* print separator */
 3324 rhaas                     576 CBC      196534 :         appendStringInfoChar(s, ':');
 3324 rhaas                     577 ECB             : 
                                578                 :         /* print data */
 3324 rhaas                     579 GIC      196534 :         if (isnull)
                                580           20615 :             appendStringInfoString(s, "null");
                                581          175919 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
                                582              12 :             appendStringInfoString(s, "unchanged-toast-datum");
                                583          175907 :         else if (!typisvarlena)
 3324 rhaas                     584 CBC       60221 :             print_literal(s, typid,
 3324 rhaas                     585 ECB             :                           OidOutputFunctionCall(typoutput, origval));
                                586                 :         else
                                587                 :         {
 3260 bruce                     588                 :             Datum       val;    /* definitely detoasted Datum */
                                589                 : 
 3324 rhaas                     590 GIC      115686 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
                                591          115686 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
                                592                 :         }
                                593                 :     }
 3324 rhaas                     594 CBC      145521 : }
                                595                 : 
                                596                 : /*
                                597                 :  * callback for individual changed tuples
                                598                 :  */
                                599                 : static void
 3324 rhaas                     600 GIC      150510 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                601                 :                  Relation relation, ReorderBufferChange *change)
                                602                 : {
 3324 rhaas                     603 ECB             :     TestDecodingData *data;
  873 akapila                   604                 :     TestDecodingTxnData *txndata;
                                605                 :     Form_pg_class class_form;
                                606                 :     TupleDesc   tupdesc;
 3324 rhaas                     607                 :     MemoryContext old;
                                608                 : 
 3324 rhaas                     609 CBC      150510 :     data = ctx->output_plugin_private;
  873 akapila                   610 GIC      150510 :     txndata = txn->output_plugin_private;
 3142 andres                    611 ECB             : 
                                612                 :     /* output BEGIN if we haven't yet */
  873 akapila                   613 CBC      150510 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 3142 andres                    614 ECB             :     {
 3142 andres                    615 GIC         212 :         pg_output_begin(ctx, data, txn, false);
                                616                 :     }
  873 akapila                   617 CBC      150510 :     txndata->xact_wrote_changes = true;
                                618                 : 
 3324 rhaas                     619          150510 :     class_form = RelationGetForm(relation);
 3324 rhaas                     620 GIC      150510 :     tupdesc = RelationGetDescr(relation);
 3324 rhaas                     621 ECB             : 
                                622                 :     /* Avoid leaking memory by using and resetting our own context */
 3324 rhaas                     623 CBC      150510 :     old = MemoryContextSwitchTo(data->context);
 3324 rhaas                     624 ECB             : 
 3324 rhaas                     625 CBC      150510 :     OutputPluginPrepareWrite(ctx, true);
                                626                 : 
                                627          150510 :     appendStringInfoString(ctx->out, "table ");
 3324 rhaas                     628 GIC      150510 :     appendStringInfoString(ctx->out,
 1165 alvherre                  629 CBC      150510 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
 1845 peter_e                   630 GIC      150510 :                                                       class_form->relrewrite ?
 1845 peter_e                   631 CBC           1 :                                                       get_rel_name(class_form->relrewrite) :
 2118 tgl                       632 ECB             :                                                       NameStr(class_form->relname)));
 2890 peter_e                   633 CBC      150510 :     appendStringInfoChar(ctx->out, ':');
 3324 rhaas                     634 EUB             : 
 3324 rhaas                     635 GIC      150510 :     switch (change->action)
 3324 rhaas                     636 ECB             :     {
 3324 rhaas                     637 CBC      132945 :         case REORDER_BUFFER_CHANGE_INSERT:
 3324 rhaas                     638 GIC      132945 :             appendStringInfoString(ctx->out, " INSERT:");
 3320 tgl                       639 CBC      132945 :             if (change->data.tp.newtuple == NULL)
 3324 rhaas                     640 LBC           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
 3324 rhaas                     641 ECB             :             else
 3324 rhaas                     642 CBC      132945 :                 tuple_to_stringinfo(ctx->out, tupdesc,
 3320 tgl                       643 GIC      132945 :                                     &change->data.tp.newtuple->tuple,
 3324 rhaas                     644 ECB             :                                     false);
 3324 rhaas                     645 CBC      132945 :             break;
                                646            7543 :         case REORDER_BUFFER_CHANGE_UPDATE:
 3324 rhaas                     647 GIC        7543 :             appendStringInfoString(ctx->out, " UPDATE:");
 3320 tgl                       648 CBC        7543 :             if (change->data.tp.oldtuple != NULL)
                                649                 :             {
 3324 rhaas                     650 GIC          18 :                 appendStringInfoString(ctx->out, " old-key:");
 3324 rhaas                     651 CBC          18 :                 tuple_to_stringinfo(ctx->out, tupdesc,
 3320 tgl                       652 GBC          18 :                                     &change->data.tp.oldtuple->tuple,
                                653                 :                                     true);
 3324 rhaas                     654 CBC          18 :                 appendStringInfoString(ctx->out, " new-tuple:");
 3324 rhaas                     655 ECB             :             }
                                656                 : 
 3320 tgl                       657 CBC        7543 :             if (change->data.tp.newtuple == NULL)
 3324 rhaas                     658 LBC           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
 3324 rhaas                     659 ECB             :             else
 3324 rhaas                     660 GIC        7543 :                 tuple_to_stringinfo(ctx->out, tupdesc,
 3320 tgl                       661            7543 :                                     &change->data.tp.newtuple->tuple,
 3324 rhaas                     662 ECB             :                                     false);
 3324 rhaas                     663 CBC        7543 :             break;
 3324 rhaas                     664 GIC       10022 :         case REORDER_BUFFER_CHANGE_DELETE:
                                665           10022 :             appendStringInfoString(ctx->out, " DELETE:");
 3324 rhaas                     666 ECB             : 
                                667                 :             /* if there was no PK, we only know that a delete happened */
 3320 tgl                       668 GIC       10022 :             if (change->data.tp.oldtuple == NULL)
 3324 rhaas                     669 CBC        5007 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
 3324 rhaas                     670 EUB             :             /* In DELETE, only the replica identity is present; display that */
                                671                 :             else
 3324 rhaas                     672 GIC        5015 :                 tuple_to_stringinfo(ctx->out, tupdesc,
 3320 tgl                       673            5015 :                                     &change->data.tp.oldtuple->tuple,
 3324 rhaas                     674 ECB             :                                     true);
 3324 rhaas                     675 CBC       10022 :             break;
 3320 tgl                       676 UIC           0 :         default:
 3320 tgl                       677 LBC           0 :             Assert(false);
 3324 rhaas                     678 ECB             :     }
                                679                 : 
 3324 rhaas                     680 GIC      150510 :     MemoryContextSwitchTo(old);
 3324 rhaas                     681 CBC      150510 :     MemoryContextReset(data->context);
                                682                 : 
 3324 rhaas                     683 GIC      150510 :     OutputPluginWrite(ctx, true);
                                684          150510 : }
                                685                 : 
                                686                 : static void
 1828 peter_e                   687               6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                688                 :                    int nrelations, Relation relations[], ReorderBufferChange *change)
 1828 peter_e                   689 ECB             : {
                                690                 :     TestDecodingData *data;
                                691                 :     TestDecodingTxnData *txndata;
                                692                 :     MemoryContext old;
                                693                 :     int         i;
                                694                 : 
 1828 peter_e                   695 CBC           6 :     data = ctx->output_plugin_private;
  873 akapila                   696 GIC           6 :     txndata = txn->output_plugin_private;
 1828 peter_e                   697 ECB             : 
                                698                 :     /* output BEGIN if we haven't yet */
  873 akapila                   699 GIC           6 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 1828 peter_e                   700 ECB             :     {
 1828 peter_e                   701 GIC           6 :         pg_output_begin(ctx, data, txn, false);
 1828 peter_e                   702 ECB             :     }
  873 akapila                   703 GIC           6 :     txndata->xact_wrote_changes = true;
 1828 peter_e                   704 ECB             : 
                                705                 :     /* Avoid leaking memory by using and resetting our own context */
 1828 peter_e                   706 CBC           6 :     old = MemoryContextSwitchTo(data->context);
                                707                 : 
                                708               6 :     OutputPluginPrepareWrite(ctx, true);
 1828 peter_e                   709 ECB             : 
 1828 peter_e                   710 GIC           6 :     appendStringInfoString(ctx->out, "table ");
 1828 peter_e                   711 ECB             : 
 1828 peter_e                   712 CBC          13 :     for (i = 0; i < nrelations; i++)
 1828 peter_e                   713 ECB             :     {
 1828 peter_e                   714 GIC           7 :         if (i > 0)
                                715               1 :             appendStringInfoString(ctx->out, ", ");
 1828 peter_e                   716 ECB             : 
 1828 peter_e                   717 GIC           7 :         appendStringInfoString(ctx->out,
 1828 peter_e                   718 CBC           7 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
                                719               7 :                                                           NameStr(relations[i]->rd_rel->relname)));
                                720                 :     }
 1828 peter_e                   721 ECB             : 
 1828 peter_e                   722 CBC           6 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
 1828 peter_e                   723 ECB             : 
 1828 peter_e                   724 CBC           6 :     if (change->data.truncate.restart_seqs
 1828 peter_e                   725 GIC           5 :         || change->data.truncate.cascade)
                                726                 :     {
 1828 peter_e                   727 CBC           1 :         if (change->data.truncate.restart_seqs)
 1375 drowley                   728 GIC           1 :             appendStringInfoString(ctx->out, " restart_seqs");
 1828 peter_e                   729 CBC           1 :         if (change->data.truncate.cascade)
 1375 drowley                   730               1 :             appendStringInfoString(ctx->out, " cascade");
                                731                 :     }
 1828 peter_e                   732 ECB             :     else
 1828 peter_e                   733 CBC           5 :         appendStringInfoString(ctx->out, " (no-flags)");
                                734                 : 
 1828 peter_e                   735 GIC           6 :     MemoryContextSwitchTo(old);
 1828 peter_e                   736 CBC           6 :     MemoryContextReset(data->context);
                                737                 : 
 1828 peter_e                   738 GIC           6 :     OutputPluginWrite(ctx, true);
                                739               6 : }
 1828 peter_e                   740 ECB             : 
 2559 simon                     741                 : static void
 2559 simon                     742 GIC           8 : pg_decode_message(LogicalDecodingContext *ctx,
 2559 simon                     743 ECB             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
                                744                 :                   const char *prefix, Size sz, const char *message)
                                745                 : {
 2559 simon                     746 GIC           8 :     OutputPluginPrepareWrite(ctx, true);
                                747               8 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 2559 simon                     748 ECB             :                      transactional, prefix, sz);
 2559 simon                     749 GIC           8 :     appendBinaryStringInfo(ctx->out, message, sz);
                                750               8 :     OutputPluginWrite(ctx, true);
 2559 simon                     751 CBC           8 : }
  985 akapila                   752 ECB             : 
                                753                 : static void
  985 akapila                   754 GIC          11 : pg_decode_stream_start(LogicalDecodingContext *ctx,
                                755                 :                        ReorderBufferTXN *txn)
                                756                 : {
  985 akapila                   757 CBC          11 :     TestDecodingData *data = ctx->output_plugin_private;
  873 akapila                   758 GIC          11 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
                                759                 : 
  873 akapila                   760 ECB             :     /*
                                761                 :      * Allocate the txn plugin data for the first stream in the transaction.
                                762                 :      */
  873 akapila                   763 GIC          11 :     if (txndata == NULL)
                                764                 :     {
  873 akapila                   765 ECB             :         txndata =
  873 akapila                   766 CBC           7 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
                                767               7 :         txndata->xact_wrote_changes = false;
  873 akapila                   768 GBC           7 :         txn->output_plugin_private = txndata;
                                769                 :     }
                                770                 : 
  873 akapila                   771 GIC          11 :     txndata->stream_wrote_changes = false;
  940 akapila                   772 CBC          11 :     if (data->skip_empty_xacts)
  940 akapila                   773 GIC          11 :         return;
  940 akapila                   774 LBC           0 :     pg_output_stream_start(ctx, data, txn, true);
  940 akapila                   775 ECB             : }
  940 akapila                   776 EUB             : 
                                777                 : static void
  940 akapila                   778 CBC           6 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
  940 akapila                   779 ECB             : {
  940 akapila                   780 CBC           6 :     OutputPluginPrepareWrite(ctx, last_write);
  985 akapila                   781 GIC           6 :     if (data->include_xids)
  985 akapila                   782 UIC           0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
  985 akapila                   783 ECB             :     else
  906 drowley                   784 GIC           6 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
  940 akapila                   785               6 :     OutputPluginWrite(ctx, last_write);
  985 akapila                   786 CBC           6 : }
  985 akapila                   787 ECB             : 
                                788                 : static void
  985 akapila                   789 CBC          11 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
  985 akapila                   790 ECB             :                       ReorderBufferTXN *txn)
                                791                 : {
  985 akapila                   792 CBC          11 :     TestDecodingData *data = ctx->output_plugin_private;
  873                           793              11 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
  985 akapila                   794 EUB             : 
  873 akapila                   795 GIC          11 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
  940 akapila                   796 CBC           5 :         return;
  940 akapila                   797 ECB             : 
  985 akapila                   798 GIC           6 :     OutputPluginPrepareWrite(ctx, true);
                                799               6 :     if (data->include_xids)
  985 akapila                   800 UIC           0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
  985 akapila                   801 ECB             :     else
  906 drowley                   802 GIC           6 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
  985 akapila                   803               6 :     OutputPluginWrite(ctx, true);
                                804                 : }
  985 akapila                   805 ECB             : 
                                806                 : static void
  985 akapila                   807 GIC           3 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
                                808                 :                        ReorderBufferTXN *txn,
                                809                 :                        XLogRecPtr abort_lsn)
                                810                 : {
                                811               3 :     TestDecodingData *data = ctx->output_plugin_private;
  985 akapila                   812 ECB             : 
  873                           813                 :     /*
                                814                 :      * stream abort can be sent for an individual subtransaction but we
                                815                 :      * maintain the output_plugin_private only under the toptxn so if this is
                                816                 :      * not the toptxn then fetch the toptxn.
                                817                 :      */
   23 akapila                   818 GNC           3 :     ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
  873 akapila                   819 GBC           3 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
                                820               3 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
                                821                 : 
   23 akapila                   822 GNC           3 :     if (rbtxn_is_toptxn(txn))
  873 akapila                   823 ECB             :     {
  873 akapila                   824 LBC           0 :         Assert(txn->output_plugin_private != NULL);
  873 akapila                   825 UIC           0 :         pfree(txndata);
  873 akapila                   826 UBC           0 :         txn->output_plugin_private = NULL;
  873 akapila                   827 EUB             :     }
                                828                 : 
  873 akapila                   829 GIC           3 :     if (data->skip_empty_xacts && !xact_wrote_changes)
  940 akapila                   830 GBC           3 :         return;
  940 akapila                   831 EUB             : 
  985 akapila                   832 UIC           0 :     OutputPluginPrepareWrite(ctx, true);
                                833               0 :     if (data->include_xids)
                                834               0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
  985 akapila                   835 ECB             :     else
  906 drowley                   836 UIC           0 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
  985 akapila                   837               0 :     OutputPluginWrite(ctx, true);
                                838                 : }
  985 akapila                   839 ECB             : 
  830                           840                 : static void
  830 akapila                   841 GIC           1 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
  830 akapila                   842 ECB             :                          ReorderBufferTXN *txn,
  830 akapila                   843 EUB             :                          XLogRecPtr prepare_lsn)
                                844                 : {
  830 akapila                   845 CBC           1 :     TestDecodingData *data = ctx->output_plugin_private;
  830 akapila                   846 GIC           1 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
  830 akapila                   847 ECB             : 
  830 akapila                   848 GBC           1 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
  830 akapila                   849 UBC           0 :         return;
                                850                 : 
  830 akapila                   851 CBC           1 :     OutputPluginPrepareWrite(ctx, true);
  830 akapila                   852 ECB             : 
  830 akapila                   853 GIC           1 :     if (data->include_xids)
  830 akapila                   854 LBC           0 :         appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
  830 akapila                   855 UBC           0 :                          quote_literal_cstr(txn->gid), txn->xid);
                                856                 :     else
  830 akapila                   857 GIC           1 :         appendStringInfo(ctx->out, "preparing streamed transaction %s",
  830 akapila                   858 CBC           1 :                          quote_literal_cstr(txn->gid));
                                859                 : 
  830 akapila                   860 GIC           1 :     if (data->include_timestamp)
  830 akapila                   861 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
  634 akapila                   862 ECB             :                          timestamptz_to_str(txn->xact_time.prepare_time));
                                863                 : 
  830 akapila                   864 GIC           1 :     OutputPluginWrite(ctx, true);
                                865                 : }
  830 akapila                   866 ECB             : 
  985                           867                 : static void
  985 akapila                   868 CBC           4 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
                                869                 :                         ReorderBufferTXN *txn,
  985 akapila                   870 ECB             :                         XLogRecPtr commit_lsn)
                                871                 : {
  985 akapila                   872 GIC           4 :     TestDecodingData *data = ctx->output_plugin_private;
  873 akapila                   873 CBC           4 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
  873 akapila                   874 GBC           4 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
                                875                 : 
  873 akapila                   876 CBC           4 :     pfree(txndata);
  873 akapila                   877 GIC           4 :     txn->output_plugin_private = NULL;
  985 akapila                   878 ECB             : 
  873 akapila                   879 GBC           4 :     if (data->skip_empty_xacts && !xact_wrote_changes)
  940 akapila                   880 UIC           0 :         return;
  940 akapila                   881 ECB             : 
  985 akapila                   882 GIC           4 :     OutputPluginPrepareWrite(ctx, true);
  985 akapila                   883 ECB             : 
  985 akapila                   884 GBC           4 :     if (data->include_xids)
  985 akapila                   885 UIC           0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
                                886                 :     else
  906 drowley                   887 CBC           4 :         appendStringInfoString(ctx->out, "committing streamed transaction");
                                888                 : 
  985 akapila                   889 GIC           4 :     if (data->include_timestamp)
  985 akapila                   890 UIC           0 :         appendStringInfo(ctx->out, " (at %s)",
                                891                 :                          timestamptz_to_str(txn->xact_time.commit_time));
                                892                 : 
  985 akapila                   893 GIC           4 :     OutputPluginWrite(ctx, true);
                                894                 : }
                                895                 : 
  985 akapila                   896 ECB             : /*
                                897                 :  * In streaming mode, we don't display the changes as the transaction can abort
                                898                 :  * at a later point in time.  We don't want users to see the changes until the
                                899                 :  * transaction is committed.
                                900                 :  */
                                901                 : static void
  985 akapila                   902 CBC          63 : pg_decode_stream_change(LogicalDecodingContext *ctx,
                                903                 :                         ReorderBufferTXN *txn,
                                904                 :                         Relation relation,
  985 akapila                   905 ECB             :                         ReorderBufferChange *change)
                                906                 : {
  985 akapila                   907 CBC          63 :     TestDecodingData *data = ctx->output_plugin_private;
  873 akapila                   908 GIC          63 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
  985 akapila                   909 ECB             : 
                                910                 :     /* output stream start if we haven't yet */
  873 akapila                   911 CBC          63 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
  940 akapila                   912 ECB             :     {
  940 akapila                   913 GBC           6 :         pg_output_stream_start(ctx, data, txn, false);
                                914                 :     }
  873 akapila                   915 CBC          63 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
  940 akapila                   916 ECB             : 
  985 akapila                   917 CBC          63 :     OutputPluginPrepareWrite(ctx, true);
  985 akapila                   918 GIC          63 :     if (data->include_xids)
  985 akapila                   919 UIC           0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
                                920                 :     else
  906 drowley                   921 GIC          63 :         appendStringInfoString(ctx->out, "streaming change for transaction");
  985 akapila                   922              63 :     OutputPluginWrite(ctx, true);
                                923              63 : }
                                924                 : 
  985 akapila                   925 ECB             : /*
                                926                 :  * In streaming mode, we don't display the contents for transactional messages
                                927                 :  * as the transaction can abort at a later point in time.  We don't want users to
                                928                 :  * see the message contents until the transaction is committed.
                                929                 :  */
                                930                 : static void
  985 akapila                   931 CBC           3 : pg_decode_stream_message(LogicalDecodingContext *ctx,
                                932                 :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
  985 akapila                   933 ECB             :                          const char *prefix, Size sz, const char *message)
                                934                 : {
  985 akapila                   935 GIC           3 :     OutputPluginPrepareWrite(ctx, true);
                                936                 : 
                                937               3 :     if (transactional)
  985 akapila                   938 EUB             :     {
  985 akapila                   939 GIC           3 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
  985 akapila                   940 EUB             :                          transactional, prefix, sz);
                                941                 :     }
                                942                 :     else
  985 akapila                   943 ECB             :     {
  985 akapila                   944 LBC           0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
                                945                 :                          transactional, prefix, sz);
  985 akapila                   946 UIC           0 :         appendBinaryStringInfo(ctx->out, message, sz);
                                947                 :     }
                                948                 : 
  985 akapila                   949 GIC           3 :     OutputPluginWrite(ctx, true);
                                950               3 : }
  985 akapila                   951 EUB             : 
                                952                 : /*
                                953                 :  * In streaming mode, we don't display the detailed information of Truncate.
                                954                 :  * See pg_decode_stream_change.
                                955                 :  */
                                956                 : static void
  985 akapila                   957 UIC           0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  985 akapila                   958 EUB             :                           int nrelations, Relation relations[],
                                959                 :                           ReorderBufferChange *change)
                                960                 : {
  985 akapila                   961 UIC           0 :     TestDecodingData *data = ctx->output_plugin_private;
  873 akapila                   962 UBC           0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
                                963                 : 
                                964               0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
  940 akapila                   965 EUB             :     {
  940 akapila                   966 UBC           0 :         pg_output_stream_start(ctx, data, txn, false);
                                967                 :     }
  873                           968               0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
  940 akapila                   969 EUB             : 
  985 akapila                   970 UBC           0 :     OutputPluginPrepareWrite(ctx, true);
  985 akapila                   971 UIC           0 :     if (data->include_xids)
                                972               0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
                                973                 :     else
  906 drowley                   974               0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
  985 akapila                   975               0 :     OutputPluginWrite(ctx, true);
                                976               0 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a