LCOV - differential code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Coverage Total Hit UNC LBC UIC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 95.1 % 755 718 7 7 23 17 409 74 218 11 416 9 77
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 40 40 39 1 36 3
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * pgoutput.c
       4                 :  *      Logical Replication output plugin
       5                 :  *
       6                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *        src/backend/replication/pgoutput/pgoutput.c
      10                 :  *
      11                 :  *-------------------------------------------------------------------------
      12                 :  */
      13                 : #include "postgres.h"
      14                 : 
      15                 : #include "access/tupconvert.h"
      16                 : #include "catalog/partition.h"
      17                 : #include "catalog/pg_publication.h"
      18                 : #include "catalog/pg_publication_rel.h"
      19                 : #include "catalog/pg_subscription.h"
      20                 : #include "commands/defrem.h"
      21                 : #include "commands/subscriptioncmds.h"
      22                 : #include "executor/executor.h"
      23                 : #include "fmgr.h"
      24                 : #include "nodes/makefuncs.h"
      25                 : #include "optimizer/optimizer.h"
      26                 : #include "parser/parse_relation.h"
      27                 : #include "replication/logical.h"
      28                 : #include "replication/logicalproto.h"
      29                 : #include "replication/origin.h"
      30                 : #include "replication/pgoutput.h"
      31                 : #include "utils/builtins.h"
      32                 : #include "utils/inval.h"
      33                 : #include "utils/lsyscache.h"
      34                 : #include "utils/memutils.h"
      35                 : #include "utils/rel.h"
      36                 : #include "utils/syscache.h"
      37                 : #include "utils/varlena.h"
      38                 : 
      39 GIC         360 : PG_MODULE_MAGIC;
      40                 : 
      41                 : static void pgoutput_startup(LogicalDecodingContext *ctx,
      42                 :                              OutputPluginOptions *opt, bool is_init);
      43                 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      44                 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      45                 :                                ReorderBufferTXN *txn);
      46                 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      47                 :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      48                 : static void pgoutput_change(LogicalDecodingContext *ctx,
      49                 :                             ReorderBufferTXN *txn, Relation relation,
      50                 :                             ReorderBufferChange *change);
      51                 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      52                 :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      53                 :                               ReorderBufferChange *change);
      54                 : static void pgoutput_message(LogicalDecodingContext *ctx,
      55                 :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      56                 :                              bool transactional, const char *prefix,
      57                 :                              Size sz, const char *message);
      58                 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      59                 :                                    RepOriginId origin_id);
      60                 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      61                 :                                        ReorderBufferTXN *txn);
      62                 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      63                 :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      64                 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      65                 :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      66                 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      67                 :                                            ReorderBufferTXN *txn,
      68                 :                                            XLogRecPtr prepare_end_lsn,
      69                 :                                            TimestampTz prepare_time);
      70                 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      71                 :                                   ReorderBufferTXN *txn);
      72                 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      73                 :                                  ReorderBufferTXN *txn);
      74                 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      75                 :                                   ReorderBufferTXN *txn,
      76                 :                                   XLogRecPtr abort_lsn);
      77                 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      78                 :                                    ReorderBufferTXN *txn,
      79                 :                                    XLogRecPtr commit_lsn);
      80                 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      81                 :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      82                 : 
      83                 : static bool publications_valid;
      84                 : static bool in_streaming;
      85                 : static bool publish_no_origin;
      86                 : 
      87                 : static List *LoadPublications(List *pubnames);
      88                 : static void publication_invalidation_cb(Datum arg, int cacheid,
      89                 :                                         uint32 hashvalue);
      90                 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
      91                 :                                     LogicalDecodingContext *ctx,
      92                 :                                     Bitmapset *columns);
      93                 : static void send_repl_origin(LogicalDecodingContext *ctx,
      94                 :                              RepOriginId origin_id, XLogRecPtr origin_lsn,
      95                 :                              bool send_origin);
      96                 : 
      97                 : /*
      98                 :  * Only 3 publication actions are used for row filtering ("insert", "update",
      99                 :  * "delete"). See RelationSyncEntry.exprstate[].
     100                 :  */
     101                 : enum RowFilterPubAction
     102                 : {
     103                 :     PUBACTION_INSERT,
     104                 :     PUBACTION_UPDATE,
     105                 :     PUBACTION_DELETE
     106                 : };
     107                 : 
     108                 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     109                 : 
     110                 : /*
     111                 :  * Entry in the map used to remember which relation schemas we sent.
     112                 :  *
     113                 :  * The schema_sent flag determines if the current schema record for the
     114                 :  * relation (and for its ancestor if publish_as_relid is set) was already
     115                 :  * sent to the subscriber (in which case we don't need to send it again).
     116                 :  *
     117                 :  * The schema cache on downstream is however updated only at commit time,
     118                 :  * and with streamed transactions the commit order may be different from
     119                 :  * the order the transactions are sent in. Also, the (sub) transactions
     120                 :  * might get aborted so we need to send the schema for each (sub) transaction
     121                 :  * so that we don't lose the schema information on abort. For handling this,
     122                 :  * we maintain the list of xids (streamed_txns) for those we have already sent
     123                 :  * the schema.
     124                 :  *
     125                 :  * For partitions, 'pubactions' considers not only the table's own
     126                 :  * publications, but also those of all of its ancestors.
     127                 :  */
     128                 : typedef struct RelationSyncEntry
     129                 : {
     130                 :     Oid         relid;          /* relation oid */
     131                 : 
     132                 :     bool        replicate_valid;    /* overall validity flag for entry */
     133                 : 
     134                 :     bool        schema_sent;
     135                 :     List       *streamed_txns;  /* streamed toplevel transactions with this
     136                 :                                  * schema */
     137                 : 
     138                 :     /* are we publishing this rel? */
     139                 :     PublicationActions pubactions;
     140                 : 
     141                 :     /*
     142                 :      * ExprState array for row filter. Different publication actions don't
     143                 :      * allow multiple expressions to always be combined into one, because
     144                 :      * updates or deletes restrict the column in expression to be part of the
     145                 :      * replica identity index whereas inserts do not have this restriction, so
     146                 :      * there is one ExprState per publication action.
     147                 :      */
     148                 :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     149                 :     EState     *estate;         /* executor state used for row filter */
     150                 :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
     151                 :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
     152                 : 
     153                 :     /*
     154                 :      * OID of the relation to publish changes as.  For a partition, this may
     155                 :      * be set to one of its ancestors whose schema will be used when
     156                 :      * replicating changes, if publish_via_partition_root is set for the
     157                 :      * publication.
     158                 :      */
     159                 :     Oid         publish_as_relid;
     160                 : 
     161                 :     /*
     162                 :      * Map used when replicating using an ancestor's schema to convert tuples
     163                 :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     164                 :      * same as 'relid' or if unnecessary due to partition and the ancestor
     165                 :      * having identical TupleDesc.
     166                 :      */
     167                 :     AttrMap    *attrmap;
     168                 : 
     169                 :     /*
     170                 :      * Columns included in the publication, or NULL if all columns are
     171                 :      * included implicitly.  Note that the attnums in this bitmap are not
     172                 :      * shifted by FirstLowInvalidHeapAttributeNumber.
     173                 :      */
     174                 :     Bitmapset  *columns;
     175                 : 
     176                 :     /*
     177                 :      * Private context to store additional data for this entry - state for the
     178                 :      * row filter expressions, column list, etc.
     179                 :      */
     180                 :     MemoryContext entry_cxt;
     181                 : } RelationSyncEntry;
     182                 : 
     183                 : /*
     184                 :  * Maintain a per-transaction level variable to track whether the transaction
     185                 :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     186                 :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     187                 :  * messages for empty transactions which saves network bandwidth.
     188                 :  *
     189                 :  * This optimization is not used for prepared transactions because if the
     190                 :  * WALSender restarts after prepare of a transaction and before commit prepared
     191                 :  * of the same transaction then we won't be able to figure out if we have
     192                 :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     193                 :  * because we would have lost the in-memory txndata information that was
     194                 :  * present prior to the restart. This will result in sending a spurious
     195                 :  * COMMIT PREPARED without a corresponding prepared transaction at the
     196                 :  * downstream which would lead to an error when it tries to process it.
     197                 :  *
     198                 :  * XXX We could achieve this optimization by changing protocol to send
     199                 :  * additional information so that downstream can detect that the corresponding
     200                 :  * prepare has not been sent. However, adding such a check for every
     201                 :  * transaction in the downstream could be costly so we might want to do it
     202                 :  * optionally.
     203                 :  *
     204                 :  * We also don't have this optimization for streamed transactions because
     205                 :  * they can contain prepared transactions.
     206                 :  */
     207                 : typedef struct PGOutputTxnData
     208                 : {
     209                 :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     210                 : } PGOutputTxnData;
     211                 : 
     212                 : /* Map used to remember which relation schemas we sent. */
     213                 : static HTAB *RelationSyncCache = NULL;
     214                 : 
     215                 : static void init_rel_sync_cache(MemoryContext cachectx);
     216                 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     217                 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     218                 :                                              Relation relation);
     219                 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     220                 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     221                 :                                           uint32 hashvalue);
     222                 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     223                 :                                             TransactionId xid);
     224                 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     225                 :                                             TransactionId xid);
     226                 : static void init_tuple_slot(PGOutputData *data, Relation relation,
     227                 :                             RelationSyncEntry *entry);
     228                 : 
     229                 : /* row filter routines */
     230                 : static EState *create_estate_for_relation(Relation rel);
     231                 : static void pgoutput_row_filter_init(PGOutputData *data,
     232                 :                                      List *publications,
     233                 :                                      RelationSyncEntry *entry);
     234                 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     235                 :                                           ExprContext *econtext);
     236                 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     237                 :                                 TupleTableSlot **new_slot_ptr,
     238                 :                                 RelationSyncEntry *entry,
     239                 :                                 ReorderBufferChangeType *action);
     240                 : 
     241                 : /* column list routines */
     242                 : static void pgoutput_column_list_init(PGOutputData *data,
     243                 :                                       List *publications,
     244                 :                                       RelationSyncEntry *entry);
     245                 : 
     246                 : /*
     247                 :  * Specify output plugin callbacks
     248                 :  */
     249                 : void
     250 CBC         511 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     251                 : {
     252             511 :     cb->startup_cb = pgoutput_startup;
     253             511 :     cb->begin_cb = pgoutput_begin_txn;
     254             511 :     cb->change_cb = pgoutput_change;
     255             511 :     cb->truncate_cb = pgoutput_truncate;
     256 GIC         511 :     cb->message_cb = pgoutput_message;
     257 CBC         511 :     cb->commit_cb = pgoutput_commit_txn;
     258 ECB             : 
     259 CBC         511 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     260             511 :     cb->prepare_cb = pgoutput_prepare_txn;
     261             511 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     262             511 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     263 GIC         511 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     264             511 :     cb->shutdown_cb = pgoutput_shutdown;
     265 ECB             : 
     266                 :     /* transaction streaming */
     267 CBC         511 :     cb->stream_start_cb = pgoutput_stream_start;
     268             511 :     cb->stream_stop_cb = pgoutput_stream_stop;
     269             511 :     cb->stream_abort_cb = pgoutput_stream_abort;
     270             511 :     cb->stream_commit_cb = pgoutput_stream_commit;
     271             511 :     cb->stream_change_cb = pgoutput_change;
     272 GIC         511 :     cb->stream_message_cb = pgoutput_message;
     273 CBC         511 :     cb->stream_truncate_cb = pgoutput_truncate;
     274 ECB             :     /* transaction streaming - two-phase commit */
     275 GIC         511 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     276             511 : }
     277 ECB             : 
     278                 : static void
     279 GIC         281 : parse_output_parameters(List *options, PGOutputData *data)
     280 ECB             : {
     281                 :     ListCell   *lc;
     282 CBC         281 :     bool        protocol_version_given = false;
     283             281 :     bool        publication_names_given = false;
     284             281 :     bool        binary_option_given = false;
     285             281 :     bool        messages_option_given = false;
     286             281 :     bool        streaming_given = false;
     287 GIC         281 :     bool        two_phase_option_given = false;
     288 GNC         281 :     bool        origin_option_given = false;
     289 ECB             : 
     290 CBC         281 :     data->binary = false;
     291 GNC         281 :     data->streaming = LOGICALREP_STREAM_OFF;
     292 CBC         281 :     data->messages = false;
     293 GIC         281 :     data->two_phase = false;
     294 ECB             : 
     295 GIC        1170 :     foreach(lc, options)
     296 ECB             :     {
     297 GIC         889 :         DefElem    *defel = (DefElem *) lfirst(lc);
     298 ECB             : 
     299 GIC         889 :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     300                 : 
     301 ECB             :         /* Check each param, whether or not we recognize it */
     302 GIC         889 :         if (strcmp(defel->defname, "proto_version") == 0)
     303                 :         {
     304                 :             unsigned long parsed;
     305                 :             char       *endptr;
     306 ECB             : 
     307 GBC         281 :             if (protocol_version_given)
     308 UIC           0 :                 ereport(ERROR,
     309                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     310 ECB             :                          errmsg("conflicting or redundant options")));
     311 GIC         281 :             protocol_version_given = true;
     312 ECB             : 
     313 CBC         281 :             errno = 0;
     314             281 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
     315 GBC         281 :             if (errno != 0 || *endptr != '\0')
     316 UIC           0 :                 ereport(ERROR,
     317                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     318                 :                          errmsg("invalid proto_version")));
     319 ECB             : 
     320 GBC         281 :             if (parsed > PG_UINT32_MAX)
     321 UIC           0 :                 ereport(ERROR,
     322                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     323                 :                          errmsg("proto_version \"%s\" out of range",
     324                 :                                 strVal(defel->arg))));
     325 ECB             : 
     326 GIC         281 :             data->protocol_version = (uint32) parsed;
     327 ECB             :         }
     328 GIC         608 :         else if (strcmp(defel->defname, "publication_names") == 0)
     329 ECB             :         {
     330 GBC         281 :             if (publication_names_given)
     331 UIC           0 :                 ereport(ERROR,
     332                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     333 ECB             :                          errmsg("conflicting or redundant options")));
     334 GIC         281 :             publication_names_given = true;
     335 ECB             : 
     336 GIC         281 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     337 EUB             :                                        &data->publication_names))
     338 UIC           0 :                 ereport(ERROR,
     339                 :                         (errcode(ERRCODE_INVALID_NAME),
     340                 :                          errmsg("invalid publication_names syntax")));
     341 ECB             :         }
     342 GIC         327 :         else if (strcmp(defel->defname, "binary") == 0)
     343 ECB             :         {
     344 GBC          11 :             if (binary_option_given)
     345 UIC           0 :                 ereport(ERROR,
     346                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     347 ECB             :                          errmsg("conflicting or redundant options")));
     348 GIC          11 :             binary_option_given = true;
     349 ECB             : 
     350 GIC          11 :             data->binary = defGetBoolean(defel);
     351 ECB             :         }
     352 GIC         316 :         else if (strcmp(defel->defname, "messages") == 0)
     353 ECB             :         {
     354 GBC           4 :             if (messages_option_given)
     355 UIC           0 :                 ereport(ERROR,
     356                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     357 ECB             :                          errmsg("conflicting or redundant options")));
     358 GIC           4 :             messages_option_given = true;
     359 ECB             : 
     360 GIC           4 :             data->messages = defGetBoolean(defel);
     361 ECB             :         }
     362 GIC         312 :         else if (strcmp(defel->defname, "streaming") == 0)
     363 ECB             :         {
     364 GBC          33 :             if (streaming_given)
     365 UIC           0 :                 ereport(ERROR,
     366                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     367 ECB             :                          errmsg("conflicting or redundant options")));
     368 GIC          33 :             streaming_given = true;
     369 ECB             : 
     370 GNC          33 :             data->streaming = defGetStreamingMode(defel);
     371 ECB             :         }
     372 GIC         279 :         else if (strcmp(defel->defname, "two_phase") == 0)
     373 ECB             :         {
     374 GBC           4 :             if (two_phase_option_given)
     375 UIC           0 :                 ereport(ERROR,
     376                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     377 ECB             :                          errmsg("conflicting or redundant options")));
     378 GIC           4 :             two_phase_option_given = true;
     379 ECB             : 
     380 GIC           4 :             data->two_phase = defGetBoolean(defel);
     381 ECB             :         }
     382 GNC         275 :         else if (strcmp(defel->defname, "origin") == 0)
     383                 :         {
     384             275 :             if (origin_option_given)
     385 UNC           0 :                 ereport(ERROR,
     386                 :                         errcode(ERRCODE_SYNTAX_ERROR),
     387                 :                         errmsg("conflicting or redundant options"));
     388 GNC         275 :             origin_option_given = true;
     389                 : 
     390             275 :             data->origin = defGetString(defel);
     391             275 :             if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
     392               9 :                 publish_no_origin = true;
     393             266 :             else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
     394             266 :                 publish_no_origin = false;
     395                 :             else
     396 UNC           0 :                 ereport(ERROR,
     397                 :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     398                 :                         errmsg("unrecognized origin value: \"%s\"", data->origin));
     399                 :         }
     400                 :         else
     401 LBC           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     402 EUB             :     }
     403 GIC         281 : }
     404                 : 
     405 ECB             : /*
     406                 :  * Initialize this plugin
     407                 :  */
     408                 : static void
     409 CBC         511 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     410 ECB             :                  bool is_init)
     411                 : {
     412 GIC         511 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     413 EUB             :     static bool publication_callback_registered = false;
     414                 : 
     415                 :     /* Create our memory context for private allocations. */
     416 GIC         511 :     data->context = AllocSetContextCreate(ctx->context,
     417                 :                                           "logical replication output context",
     418 EUB             :                                           ALLOCSET_DEFAULT_SIZES);
     419                 : 
     420 CBC         511 :     data->cachectx = AllocSetContextCreate(ctx->context,
     421                 :                                            "logical replication cache context",
     422                 :                                            ALLOCSET_DEFAULT_SIZES);
     423                 : 
     424 GIC         511 :     ctx->output_plugin_private = data;
     425                 : 
     426 ECB             :     /* This plugin uses binary protocol. */
     427 GIC         511 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     428                 : 
     429 ECB             :     /*
     430                 :      * This is replication start and not slot initialization.
     431                 :      *
     432                 :      * Parse and validate options passed by the client.
     433                 :      */
     434 GIC         511 :     if (!is_init)
     435                 :     {
     436                 :         /* Parse the params and ERROR if we see any we don't recognize */
     437 CBC         281 :         parse_output_parameters(ctx->output_plugin_options, data);
     438                 : 
     439                 :         /* Check if we support requested protocol */
     440 GIC         281 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     441 LBC           0 :             ereport(ERROR,
     442                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     443                 :                      errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
     444 ECB             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     445                 : 
     446 GIC         281 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     447 UIC           0 :             ereport(ERROR,
     448                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     449                 :                      errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
     450                 :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     451 ECB             : 
     452 GNC         281 :         if (data->publication_names == NIL)
     453 UIC           0 :             ereport(ERROR,
     454 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     455                 :                      errmsg("publication_names parameter missing")));
     456                 : 
     457                 :         /*
     458 EUB             :          * Decide whether to enable streaming. It is disabled by default, in
     459                 :          * which case we just update the flag in decoding context. Otherwise
     460                 :          * we only allow it with sufficient version of the protocol, and when
     461                 :          * the output plugin supports it.
     462                 :          */
     463 GNC         281 :         if (data->streaming == LOGICALREP_STREAM_OFF)
     464 GBC         248 :             ctx->streaming = false;
     465 GNC          33 :         else if (data->streaming == LOGICALREP_STREAM_ON &&
     466              26 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     467 UIC           0 :             ereport(ERROR,
     468                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     469                 :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     470 ECB             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     471 GNC          33 :         else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
     472               7 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
     473 UNC           0 :             ereport(ERROR,
     474                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     475                 :                      errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
     476                 :                             data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
     477 GBC          33 :         else if (!ctx->streaming)
     478 UIC           0 :             ereport(ERROR,
     479                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     480                 :                      errmsg("streaming requested, but not supported by output plugin")));
     481                 : 
     482                 :         /* Also remember we're currently not streaming any transaction. */
     483 GIC         281 :         in_streaming = false;
     484                 : 
     485                 :         /*
     486                 :          * Here, we just check whether the two-phase option is passed by
     487 ECB             :          * plugin and decide whether to enable it at later point of time. It
     488                 :          * remains enabled if the previous start-up has done so. But we only
     489                 :          * allow the option to be passed in with sufficient version of the
     490                 :          * protocol, and when the output plugin supports it.
     491 EUB             :          */
     492 GIC         281 :         if (!data->two_phase)
     493             277 :             ctx->twophase_opt_given = false;
     494               4 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     495 LBC           0 :             ereport(ERROR,
     496 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     497 EUB             :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     498                 :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     499 GIC           4 :         else if (!ctx->twophase)
     500 UIC           0 :             ereport(ERROR,
     501 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     502 EUB             :                      errmsg("two-phase commit requested, but not supported by output plugin")));
     503                 :         else
     504 GIC           4 :             ctx->twophase_opt_given = true;
     505                 : 
     506                 :         /* Init publication state. */
     507 CBC         281 :         data->publications = NIL;
     508 GIC         281 :         publications_valid = false;
     509                 : 
     510                 :         /*
     511                 :          * Register callback for pg_publication if we didn't already do that
     512                 :          * during some previous call in this process.
     513                 :          */
     514             281 :         if (!publication_callback_registered)
     515                 :         {
     516 CBC         281 :             CacheRegisterSyscacheCallback(PUBLICATIONOID,
     517 ECB             :                                           publication_invalidation_cb,
     518                 :                                           (Datum) 0);
     519 GBC         281 :             publication_callback_registered = true;
     520                 :         }
     521                 : 
     522                 :         /* Initialize relation schema cache. */
     523 CBC         281 :         init_rel_sync_cache(CacheMemoryContext);
     524 EUB             :     }
     525                 :     else
     526                 :     {
     527                 :         /*
     528 ECB             :          * Disable the streaming and prepared transactions during the slot
     529                 :          * initialization mode.
     530                 :          */
     531 CBC         230 :         ctx->streaming = false;
     532             230 :         ctx->twophase = false;
     533                 :     }
     534 GIC         511 : }
     535                 : 
     536                 : /*
     537                 :  * BEGIN callback.
     538 ECB             :  *
     539                 :  * Don't send the BEGIN message here instead postpone it until the first
     540                 :  * change. In logical replication, a common scenario is to replicate a set of
     541                 :  * tables (instead of all tables) and transactions whose changes were on
     542                 :  * the table(s) that are not published will produce empty transactions. These
     543                 :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     544                 :  * using bandwidth on something with little/no use for logical replication.
     545                 :  */
     546                 : static void
     547 CBC         594 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     548                 : {
     549 GIC         594 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     550                 :                                                       sizeof(PGOutputTxnData));
     551                 : 
     552             594 :     txn->output_plugin_private = txndata;
     553             594 : }
     554                 : 
     555 ECB             : /*
     556                 :  * Send BEGIN.
     557                 :  *
     558                 :  * This is called while processing the first change of the transaction.
     559                 :  */
     560                 : static void
     561 GIC         335 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     562                 : {
     563             335 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     564             335 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     565                 : 
     566             335 :     Assert(txndata);
     567             335 :     Assert(!txndata->sent_begin_txn);
     568                 : 
     569             335 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     570             335 :     logicalrep_write_begin(ctx->out, txn);
     571 CBC         335 :     txndata->sent_begin_txn = true;
     572                 : 
     573             335 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     574                 :                      send_replication_origin);
     575                 : 
     576             335 :     OutputPluginWrite(ctx, true);
     577             335 : }
     578                 : 
     579                 : /*
     580                 :  * COMMIT callback
     581                 :  */
     582                 : static void
     583 GIC         590 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     584                 :                     XLogRecPtr commit_lsn)
     585 ECB             : {
     586 GIC         590 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     587 ECB             :     bool        sent_begin_txn;
     588                 : 
     589 GIC         590 :     Assert(txndata);
     590 ECB             : 
     591                 :     /*
     592                 :      * We don't need to send the commit message unless some relevant change
     593                 :      * from this transaction has been sent to the downstream.
     594                 :      */
     595 CBC         590 :     sent_begin_txn = txndata->sent_begin_txn;
     596 GNC         590 :     OutputPluginUpdateProgress(ctx, !sent_begin_txn);
     597 CBC         590 :     pfree(txndata);
     598 GIC         590 :     txn->output_plugin_private = NULL;
     599                 : 
     600 CBC         590 :     if (!sent_begin_txn)
     601 ECB             :     {
     602 GIC         256 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     603             256 :         return;
     604                 :     }
     605                 : 
     606             334 :     OutputPluginPrepareWrite(ctx, true);
     607 CBC         334 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     608 GIC         334 :     OutputPluginWrite(ctx, true);
     609                 : }
     610 ECB             : 
     611                 : /*
     612                 :  * BEGIN PREPARE callback
     613                 :  */
     614                 : static void
     615 GIC          18 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     616                 : {
     617              18 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     618                 : 
     619 CBC          18 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     620              18 :     logicalrep_write_begin_prepare(ctx->out, txn);
     621 ECB             : 
     622 CBC          18 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     623                 :                      send_replication_origin);
     624 ECB             : 
     625 GIC          18 :     OutputPluginWrite(ctx, true);
     626 CBC          18 : }
     627 ECB             : 
     628                 : /*
     629                 :  * PREPARE callback
     630                 :  */
     631                 : static void
     632 CBC          18 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     633                 :                      XLogRecPtr prepare_lsn)
     634                 : {
     635 GNC          18 :     OutputPluginUpdateProgress(ctx, false);
     636                 : 
     637 GIC          18 :     OutputPluginPrepareWrite(ctx, true);
     638              18 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     639 CBC          18 :     OutputPluginWrite(ctx, true);
     640 GIC          18 : }
     641 ECB             : 
     642                 : /*
     643                 :  * COMMIT PREPARED callback
     644                 :  */
     645                 : static void
     646 CBC          21 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     647                 :                              XLogRecPtr commit_lsn)
     648                 : {
     649 GNC          21 :     OutputPluginUpdateProgress(ctx, false);
     650 ECB             : 
     651 GIC          21 :     OutputPluginPrepareWrite(ctx, true);
     652              21 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     653              21 :     OutputPluginWrite(ctx, true);
     654              21 : }
     655                 : 
     656 ECB             : /*
     657                 :  * ROLLBACK PREPARED callback
     658                 :  */
     659                 : static void
     660 GIC           7 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     661 ECB             :                                ReorderBufferTXN *txn,
     662                 :                                XLogRecPtr prepare_end_lsn,
     663                 :                                TimestampTz prepare_time)
     664                 : {
     665 GNC           7 :     OutputPluginUpdateProgress(ctx, false);
     666                 : 
     667 GIC           7 :     OutputPluginPrepareWrite(ctx, true);
     668               7 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     669                 :                                        prepare_time);
     670 CBC           7 :     OutputPluginWrite(ctx, true);
     671 GIC           7 : }
     672                 : 
     673 ECB             : /*
     674                 :  * Write the current schema of the relation and its ancestor (if any) if not
     675                 :  * done yet.
     676                 :  */
     677                 : static void
     678 CBC      181950 : maybe_send_schema(LogicalDecodingContext *ctx,
     679                 :                   ReorderBufferChange *change,
     680                 :                   Relation relation, RelationSyncEntry *relentry)
     681                 : {
     682                 :     bool        schema_sent;
     683 GIC      181950 :     TransactionId xid = InvalidTransactionId;
     684 CBC      181950 :     TransactionId topxid = InvalidTransactionId;
     685                 : 
     686                 :     /*
     687                 :      * Remember XID of the (sub)transaction for the change. We don't care if
     688                 :      * it's top-level transaction or not (we have already sent that XID in
     689 ECB             :      * start of the current streaming block).
     690                 :      *
     691                 :      * If we're not in a streaming block, just use InvalidTransactionId and
     692                 :      * the write methods will not include it.
     693                 :      */
     694 CBC      181950 :     if (in_streaming)
     695          175901 :         xid = change->txn->xid;
     696                 : 
     697 GNC      181950 :     if (rbtxn_is_subtxn(change->txn))
     698           10169 :         topxid = rbtxn_get_toptxn(change->txn)->xid;
     699                 :     else
     700 GIC      171781 :         topxid = xid;
     701                 : 
     702 ECB             :     /*
     703                 :      * Do we need to send the schema? We do track streamed transactions
     704                 :      * separately, because those may be applied later (and the regular
     705                 :      * transactions won't see their effects until then) and in an order that
     706                 :      * we don't know at this point.
     707                 :      *
     708                 :      * XXX There is a scope of optimization here. Currently, we always send
     709                 :      * the schema first time in a streaming transaction but we can probably
     710                 :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     711                 :      * doing that we need to study its impact on the case where we have a mix
     712                 :      * of streaming and non-streaming transactions.
     713                 :      */
     714 GIC      181950 :     if (in_streaming)
     715          175901 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     716                 :     else
     717            6049 :         schema_sent = relentry->schema_sent;
     718 ECB             : 
     719                 :     /* Nothing to do if we already sent the schema. */
     720 GIC      181950 :     if (schema_sent)
     721 CBC      181693 :         return;
     722 ECB             : 
     723                 :     /*
     724                 :      * Send the schema.  If the changes will be published using an ancestor's
     725                 :      * schema, not the relation's own, send that ancestor's schema before
     726                 :      * sending relation's own (XXX - maybe sending only the former suffices?).
     727                 :      */
     728 GIC         257 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     729                 :     {
     730              25 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     731                 : 
     732              25 :         send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
     733              25 :         RelationClose(ancestor);
     734                 :     }
     735                 : 
     736             257 :     send_relation_and_attrs(relation, xid, ctx, relentry->columns);
     737                 : 
     738 CBC         257 :     if (in_streaming)
     739              64 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     740                 :     else
     741             193 :         relentry->schema_sent = true;
     742                 : }
     743                 : 
     744 ECB             : /*
     745                 :  * Sends a relation
     746                 :  */
     747                 : static void
     748 GIC         282 : send_relation_and_attrs(Relation relation, TransactionId xid,
     749                 :                         LogicalDecodingContext *ctx,
     750                 :                         Bitmapset *columns)
     751                 : {
     752 CBC         282 :     TupleDesc   desc = RelationGetDescr(relation);
     753                 :     int         i;
     754 ECB             : 
     755                 :     /*
     756                 :      * Write out type info if needed.  We do that only for user-created types.
     757                 :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     758                 :      * objects with hand-assigned OIDs to be "built in", not for instance any
     759                 :      * function or type defined in the information_schema. This is important
     760                 :      * because only hand-assigned OIDs can be expected to remain stable across
     761                 :      * major versions.
     762                 :      */
     763 CBC         857 :     for (i = 0; i < desc->natts; i++)
     764                 :     {
     765             575 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     766                 : 
     767 GIC         575 :         if (att->attisdropped || att->attgenerated)
     768               5 :             continue;
     769                 : 
     770             570 :         if (att->atttypid < FirstGenbkiObjectId)
     771             552 :             continue;
     772 ECB             : 
     773                 :         /* Skip this attribute if it's not present in the column list */
     774 GIC          18 :         if (columns != NULL && !bms_is_member(att->attnum, columns))
     775 UIC           0 :             continue;
     776 ECB             : 
     777 GIC          18 :         OutputPluginPrepareWrite(ctx, false);
     778              18 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     779              18 :         OutputPluginWrite(ctx, false);
     780                 :     }
     781                 : 
     782             282 :     OutputPluginPrepareWrite(ctx, false);
     783             282 :     logicalrep_write_rel(ctx->out, xid, relation, columns);
     784             282 :     OutputPluginWrite(ctx, false);
     785             282 : }
     786                 : 
     787 ECB             : /*
     788                 :  * Executor state preparation for evaluation of row filter expressions for the
     789                 :  * specified relation.
     790                 :  */
     791                 : static EState *
     792 CBC          16 : create_estate_for_relation(Relation rel)
     793                 : {
     794 ECB             :     EState     *estate;
     795                 :     RangeTblEntry *rte;
     796 GNC          16 :     List       *perminfos = NIL;
     797                 : 
     798 GIC          16 :     estate = CreateExecutorState();
     799 ECB             : 
     800 GBC          16 :     rte = makeNode(RangeTblEntry);
     801 GIC          16 :     rte->rtekind = RTE_RELATION;
     802 CBC          16 :     rte->relid = RelationGetRelid(rel);
     803              16 :     rte->relkind = rel->rd_rel->relkind;
     804              16 :     rte->rellockmode = AccessShareLock;
     805                 : 
     806 GNC          16 :     addRTEPermissionInfo(&perminfos, rte);
     807                 : 
     808              16 :     ExecInitRangeTable(estate, list_make1(rte), perminfos);
     809                 : 
     810 CBC          16 :     estate->es_output_cid = GetCurrentCommandId(false);
     811 ECB             : 
     812 CBC          16 :     return estate;
     813 ECB             : }
     814                 : 
     815                 : /*
     816                 :  * Evaluates row filter.
     817                 :  *
     818                 :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     819                 :  * isn't replicated.
     820                 :  */
     821                 : static bool
     822 GIC          36 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     823                 : {
     824 ECB             :     Datum       ret;
     825                 :     bool        isnull;
     826                 : 
     827 GIC          36 :     Assert(state != NULL);
     828 ECB             : 
     829 CBC          36 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     830 ECB             : 
     831 CBC          36 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     832 ECB             :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     833                 :          isnull ? "true" : "false");
     834                 : 
     835 GIC          36 :     if (isnull)
     836 CBC           1 :         return false;
     837                 : 
     838              35 :     return DatumGetBool(ret);
     839                 : }
     840 ECB             : 
     841                 : /*
     842                 :  * Make sure the per-entry memory context exists.
     843                 :  */
     844                 : static void
     845 GIC          53 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     846                 : {
     847                 :     Relation    relation;
     848                 : 
     849                 :     /* The context may already exist, in which case bail out. */
     850 CBC          53 :     if (entry->entry_cxt)
     851 GIC           2 :         return;
     852                 : 
     853              51 :     relation = RelationIdGetRelation(entry->publish_as_relid);
     854                 : 
     855 CBC          51 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     856                 :                                              "entry private context",
     857 ECB             :                                              ALLOCSET_SMALL_SIZES);
     858                 : 
     859 CBC          51 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     860                 :                                       RelationGetRelationName(relation));
     861                 : }
     862                 : 
     863 ECB             : /*
     864                 :  * Initialize the row filter.
     865                 :  */
     866                 : static void
     867 GIC         222 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     868                 :                          RelationSyncEntry *entry)
     869                 : {
     870                 :     ListCell   *lc;
     871             222 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
     872             222 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
     873 ECB             :     MemoryContext oldctx;
     874                 :     int         idx;
     875 GIC         222 :     bool        has_filter = true;
     876             222 :     Oid         schemaid = get_rel_namespace(entry->publish_as_relid);
     877                 : 
     878 ECB             :     /*
     879                 :      * Find if there are any row filters for this relation. If there are, then
     880                 :      * prepare the necessary ExprState and cache it in entry->exprstate. To
     881                 :      * build an expression state, we need to ensure the following:
     882                 :      *
     883                 :      * All the given publication-table mappings must be checked.
     884                 :      *
     885                 :      * Multiple publications might have multiple row filters for this
     886                 :      * relation. Since row filter usage depends on the DML operation, there
     887                 :      * are multiple lists (one for each operation) to which row filters will
     888                 :      * be appended.
     889                 :      *
     890                 :      * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row
     891                 :      * filter expression" so it takes precedence.
     892                 :      */
     893 GIC         242 :     foreach(lc, publications)
     894                 :     {
     895 CBC         226 :         Publication *pub = lfirst(lc);
     896 GIC         226 :         HeapTuple   rftuple = NULL;
     897             226 :         Datum       rfdatum = 0;
     898             226 :         bool        pub_no_filter = true;
     899 ECB             : 
     900                 :         /*
     901                 :          * If the publication is FOR ALL TABLES, or the publication includes a
     902                 :          * FOR TABLES IN SCHEMA where the table belongs to the referred
     903                 :          * schema, then it is treated the same as if there are no row filters
     904                 :          * (even if other publications have a row filter).
     905                 :          */
     906 GIC         226 :         if (!pub->alltables &&
     907             166 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     908                 :                                    ObjectIdGetDatum(schemaid),
     909                 :                                    ObjectIdGetDatum(pub->oid)))
     910                 :         {
     911                 :             /*
     912                 :              * Check for the presence of a row filter in this publication.
     913                 :              */
     914             160 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     915                 :                                       ObjectIdGetDatum(entry->publish_as_relid),
     916                 :                                       ObjectIdGetDatum(pub->oid));
     917                 : 
     918             160 :             if (HeapTupleIsValid(rftuple))
     919                 :             {
     920                 :                 /* Null indicates no filter. */
     921 CBC         148 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     922                 :                                           Anum_pg_publication_rel_prqual,
     923 ECB             :                                           &pub_no_filter);
     924                 :             }
     925                 :         }
     926                 : 
     927 GIC         226 :         if (pub_no_filter)
     928                 :         {
     929             213 :             if (rftuple)
     930             135 :                 ReleaseSysCache(rftuple);
     931                 : 
     932             213 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     933             213 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     934 CBC         213 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     935 ECB             : 
     936                 :             /*
     937                 :              * Quick exit if all the DML actions are publicized via this
     938                 :              * publication.
     939                 :              */
     940 GIC         213 :             if (no_filter[PUBACTION_INSERT] &&
     941             213 :                 no_filter[PUBACTION_UPDATE] &&
     942 CBC         206 :                 no_filter[PUBACTION_DELETE])
     943                 :             {
     944 GIC         206 :                 has_filter = false;
     945             206 :                 break;
     946 ECB             :             }
     947                 : 
     948                 :             /* No additional work for this publication. Next one. */
     949 CBC           7 :             continue;
     950                 :         }
     951                 : 
     952                 :         /* Form the per pubaction row filter lists. */
     953 GIC          13 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
     954              13 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
     955 CBC          13 :                                                 TextDatumGetCString(rfdatum));
     956 GIC          13 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
     957 CBC          13 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
     958              13 :                                                 TextDatumGetCString(rfdatum));
     959 GIC          13 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
     960 CBC          13 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
     961              13 :                                                 TextDatumGetCString(rfdatum));
     962 ECB             : 
     963 GIC          13 :         ReleaseSysCache(rftuple);
     964                 :     }                           /* loop all subscribed publications */
     965                 : 
     966                 :     /* Clean the row filter */
     967             888 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     968 ECB             :     {
     969 CBC         666 :         if (no_filter[idx])
     970 ECB             :         {
     971 GIC         627 :             list_free_deep(rfnodes[idx]);
     972 CBC         627 :             rfnodes[idx] = NIL;
     973 ECB             :         }
     974                 :     }
     975                 : 
     976 GIC         222 :     if (has_filter)
     977 ECB             :     {
     978 GIC          16 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
     979                 : 
     980              16 :         pgoutput_ensure_entry_cxt(data, entry);
     981 ECB             : 
     982                 :         /*
     983                 :          * Now all the filters for all pubactions are known. Combine them when
     984                 :          * their pubactions are the same.
     985                 :          */
     986 CBC          16 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
     987              16 :         entry->estate = create_estate_for_relation(relation);
     988              64 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     989 ECB             :         {
     990 GIC          48 :             List       *filters = NIL;
     991 ECB             :             Expr       *rfnode;
     992                 : 
     993 GIC          48 :             if (rfnodes[idx] == NIL)
     994              21 :                 continue;
     995 ECB             : 
     996 GIC          57 :             foreach(lc, rfnodes[idx])
     997 CBC          30 :                 filters = lappend(filters, stringToNode((char *) lfirst(lc)));
     998                 : 
     999 ECB             :             /* combine the row filter and cache the ExprState */
    1000 CBC          27 :             rfnode = make_orclause(filters);
    1001 GIC          27 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
    1002                 :         }                       /* for each pubaction */
    1003              16 :         MemoryContextSwitchTo(oldctx);
    1004 ECB             : 
    1005 GIC          16 :         RelationClose(relation);
    1006 ECB             :     }
    1007 GIC         222 : }
    1008 ECB             : 
    1009                 : /*
    1010                 :  * Initialize the column list.
    1011                 :  */
    1012                 : static void
    1013 GIC         222 : pgoutput_column_list_init(PGOutputData *data, List *publications,
    1014 ECB             :                           RelationSyncEntry *entry)
    1015                 : {
    1016                 :     ListCell   *lc;
    1017 GIC         222 :     bool        first = true;
    1018 CBC         222 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1019                 : 
    1020                 :     /*
    1021 ECB             :      * Find if there are any column lists for this relation. If there are,
    1022                 :      * build a bitmap using the column lists.
    1023                 :      *
    1024                 :      * Multiple publications might have multiple column lists for this
    1025                 :      * relation.
    1026                 :      *
    1027                 :      * Note that we don't support the case where the column list is different
    1028                 :      * for the same table when combining publications. See comments atop
    1029                 :      * fetch_table_list. But one can later change the publication so we still
    1030                 :      * need to check all the given publication-table mappings and report an
    1031                 :      * error if any publications have a different column list.
    1032                 :      *
    1033                 :      * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
    1034                 :      */
    1035 CBC         454 :     foreach(lc, publications)
    1036                 :     {
    1037 GIC         233 :         Publication *pub = lfirst(lc);
    1038             233 :         HeapTuple   cftuple = NULL;
    1039             233 :         Datum       cfdatum = 0;
    1040             233 :         Bitmapset  *cols = NULL;
    1041 ECB             : 
    1042                 :         /*
    1043                 :          * If the publication is FOR ALL TABLES then it is treated the same as
    1044                 :          * if there are no column lists (even if other publications have a
    1045                 :          * list).
    1046                 :          */
    1047 GIC         233 :         if (!pub->alltables)
    1048                 :         {
    1049             172 :             bool        pub_no_list = true;
    1050                 : 
    1051                 :             /*
    1052                 :              * Check for the presence of a column list in this publication.
    1053                 :              *
    1054                 :              * Note: If we find no pg_publication_rel row, it's a publication
    1055                 :              * defined for a whole schema, so it can't have a column list,
    1056                 :              * just like a FOR ALL TABLES publication.
    1057                 :              */
    1058             172 :             cftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1059                 :                                       ObjectIdGetDatum(entry->publish_as_relid),
    1060                 :                                       ObjectIdGetDatum(pub->oid));
    1061                 : 
    1062             172 :             if (HeapTupleIsValid(cftuple))
    1063 ECB             :             {
    1064                 :                 /* Lookup the column list attribute. */
    1065 CBC         153 :                 cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
    1066 ECB             :                                           Anum_pg_publication_rel_prattrs,
    1067                 :                                           &pub_no_list);
    1068                 : 
    1069                 :                 /* Build the column list bitmap in the per-entry context. */
    1070 GIC         153 :                 if (!pub_no_list)   /* when not null */
    1071                 :                 {
    1072                 :                     int         i;
    1073 GNC          37 :                     int         nliveatts = 0;
    1074              37 :                     TupleDesc   desc = RelationGetDescr(relation);
    1075                 : 
    1076 GIC          37 :                     pgoutput_ensure_entry_cxt(data, entry);
    1077                 : 
    1078              37 :                     cols = pub_collist_to_bitmapset(cols, cfdatum,
    1079 ECB             :                                                     entry->entry_cxt);
    1080                 : 
    1081                 :                     /* Get the number of live attributes. */
    1082 GNC         155 :                     for (i = 0; i < desc->natts; i++)
    1083                 :                     {
    1084             118 :                         Form_pg_attribute att = TupleDescAttr(desc, i);
    1085                 : 
    1086             118 :                         if (att->attisdropped || att->attgenerated)
    1087               2 :                             continue;
    1088                 : 
    1089             116 :                         nliveatts++;
    1090                 :                     }
    1091                 : 
    1092 ECB             :                     /*
    1093                 :                      * If column list includes all the columns of the table,
    1094                 :                      * set it to NULL.
    1095                 :                      */
    1096 GNC          37 :                     if (bms_num_members(cols) == nliveatts)
    1097                 :                     {
    1098 GIC           7 :                         bms_free(cols);
    1099               7 :                         cols = NULL;
    1100                 :                     }
    1101 ECB             :                 }
    1102                 : 
    1103 GIC         153 :                 ReleaseSysCache(cftuple);
    1104                 :             }
    1105 ECB             :         }
    1106                 : 
    1107 GIC         233 :         if (first)
    1108 ECB             :         {
    1109 GIC         222 :             entry->columns = cols;
    1110             222 :             first = false;
    1111                 :         }
    1112              11 :         else if (!bms_equal(entry->columns, cols))
    1113 CBC           1 :             ereport(ERROR,
    1114                 :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1115                 :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1116 ECB             :                            get_namespace_name(RelationGetNamespace(relation)),
    1117                 :                            RelationGetRelationName(relation)));
    1118                 :     }                           /* loop all subscribed publications */
    1119                 : 
    1120 GIC         221 :     RelationClose(relation);
    1121 CBC         221 : }
    1122                 : 
    1123                 : /*
    1124                 :  * Initialize the slot for storing new and old tuples, and build the map that
    1125 ECB             :  * will be used to convert the relation's tuples into the ancestor's format.
    1126                 :  */
    1127                 : static void
    1128 GIC         222 : init_tuple_slot(PGOutputData *data, Relation relation,
    1129 ECB             :                 RelationSyncEntry *entry)
    1130                 : {
    1131                 :     MemoryContext oldctx;
    1132                 :     TupleDesc   oldtupdesc;
    1133                 :     TupleDesc   newtupdesc;
    1134                 : 
    1135 GIC         222 :     oldctx = MemoryContextSwitchTo(data->cachectx);
    1136                 : 
    1137                 :     /*
    1138                 :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1139 ECB             :      * live as long as the cache remains.
    1140                 :      */
    1141 CBC         222 :     oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
    1142             222 :     newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
    1143                 : 
    1144 GIC         222 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1145             222 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1146 ECB             : 
    1147 GIC         222 :     MemoryContextSwitchTo(oldctx);
    1148                 : 
    1149                 :     /*
    1150 ECB             :      * Cache the map that will be used to convert the relation's tuples into
    1151                 :      * the ancestor's format, if needed.
    1152                 :      */
    1153 CBC         222 :     if (entry->publish_as_relid != RelationGetRelid(relation))
    1154                 :     {
    1155              26 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1156              26 :         TupleDesc   indesc = RelationGetDescr(relation);
    1157 GIC          26 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
    1158                 : 
    1159                 :         /* Map must live as long as the session does. */
    1160              26 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1161                 : 
    1162 GNC          26 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
    1163 ECB             : 
    1164 CBC          26 :         MemoryContextSwitchTo(oldctx);
    1165 GIC          26 :         RelationClose(ancestor);
    1166                 :     }
    1167             222 : }
    1168                 : 
    1169                 : /*
    1170                 :  * Change is checked against the row filter if any.
    1171 ECB             :  *
    1172                 :  * Returns true if the change is to be replicated, else false.
    1173                 :  *
    1174                 :  * For inserts, evaluate the row filter for new tuple.
    1175                 :  * For deletes, evaluate the row filter for old tuple.
    1176                 :  * For updates, evaluate the row filter for old and new tuple.
    1177                 :  *
    1178                 :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1179                 :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1180                 :  * only one of the tuples matches the row filter expression, we transform
    1181                 :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1182                 :  * following rules:
    1183                 :  *
    1184                 :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1185                 :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1186                 :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1187                 :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1188                 :  *
    1189                 :  * The new action is updated in the action parameter.
    1190                 :  *
    1191                 :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1192                 :  * because the original new tuple might not have column values from the replica
    1193                 :  * identity.
    1194                 :  *
    1195                 :  * Examples:
    1196                 :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1197                 :  * Since the old tuple satisfies, the initial table synchronization copied this
    1198                 :  * row (or another method was used to guarantee that there is data
    1199                 :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1200                 :  * row filter, so from a data consistency perspective, that row should be
    1201                 :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1202                 :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1203                 :  * is undesirable because it doesn't reflect what was defined in the row filter
    1204                 :  * expression on the publisher. This row on the subscriber would likely not be
    1205                 :  * modified by replication again. If someone inserted a new row with the same
    1206                 :  * old identifier, replication could stop due to a constraint violation.
    1207                 :  *
    1208                 :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1209                 :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1210                 :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1211                 :  * satisfy the row filter, so from a data consistency perspective, that row
    1212                 :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1213                 :  * statements have no effect (it matches no row -- see
    1214                 :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1215                 :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1216                 :  * someone who expects the data set to satisfy the row filter expression on the
    1217                 :  * provider.
    1218                 :  */
    1219                 : static bool
    1220 GIC      181949 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1221                 :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1222                 :                     ReorderBufferChangeType *action)
    1223                 : {
    1224                 :     TupleDesc   desc;
    1225                 :     int         i;
    1226                 :     bool        old_matched,
    1227                 :                 new_matched,
    1228                 :                 result;
    1229                 :     TupleTableSlot *tmp_new_slot;
    1230          181949 :     TupleTableSlot *new_slot = *new_slot_ptr;
    1231                 :     ExprContext *ecxt;
    1232                 :     ExprState  *filter_exprstate;
    1233                 : 
    1234                 :     /*
    1235                 :      * We need this map to avoid relying on ReorderBufferChangeType enums
    1236                 :      * having specific values.
    1237                 :      */
    1238                 :     static const int map_changetype_pubaction[] = {
    1239                 :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1240                 :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1241                 :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1242                 :     };
    1243                 : 
    1244          181949 :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1245                 :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1246                 :            *action == REORDER_BUFFER_CHANGE_DELETE);
    1247                 : 
    1248          181949 :     Assert(new_slot || old_slot);
    1249                 : 
    1250                 :     /* Get the corresponding row filter */
    1251          181949 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1252                 : 
    1253                 :     /* Bail out if there is no row filter */
    1254          181949 :     if (!filter_exprstate)
    1255          181917 :         return true;
    1256                 : 
    1257              32 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
    1258                 :          get_namespace_name(RelationGetNamespace(relation)),
    1259                 :          RelationGetRelationName(relation));
    1260                 : 
    1261              32 :     ResetPerTupleExprContext(entry->estate);
    1262                 : 
    1263 CBC          32 :     ecxt = GetPerTupleExprContext(entry->estate);
    1264                 : 
    1265                 :     /*
    1266                 :      * For the following occasions where there is only one tuple, we can
    1267                 :      * evaluate the row filter for that tuple and return.
    1268                 :      *
    1269                 :      * For inserts, we only have the new tuple.
    1270                 :      *
    1271                 :      * For updates, we can have only a new tuple when none of the replica
    1272                 :      * identity columns changed and none of those columns have external data
    1273 ECB             :      * but we still need to evaluate the row filter for the new tuple as the
    1274                 :      * existing values of those columns might not match the filter. Also,
    1275                 :      * users can use constant expressions in the row filter, so we anyway need
    1276                 :      * to evaluate it for the new tuple.
    1277                 :      *
    1278                 :      * For deletes, we only have the old tuple.
    1279                 :      */
    1280 GIC          32 :     if (!new_slot || !old_slot)
    1281                 :     {
    1282              28 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1283              28 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1284                 : 
    1285              28 :         return result;
    1286                 :     }
    1287 ECB             : 
    1288                 :     /*
    1289                 :      * Both the old and new tuples must be valid only for updates and need to
    1290                 :      * be checked against the row filter.
    1291                 :      */
    1292 GIC           4 :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1293                 : 
    1294 CBC           4 :     slot_getallattrs(new_slot);
    1295 GIC           4 :     slot_getallattrs(old_slot);
    1296                 : 
    1297 CBC           4 :     tmp_new_slot = NULL;
    1298               4 :     desc = RelationGetDescr(relation);
    1299                 : 
    1300 ECB             :     /*
    1301                 :      * The new tuple might not have all the replica identity columns, in which
    1302                 :      * case it needs to be copied over from the old tuple.
    1303                 :      */
    1304 CBC          12 :     for (i = 0; i < desc->natts; i++)
    1305                 :     {
    1306               8 :         Form_pg_attribute att = TupleDescAttr(desc, i);
    1307                 : 
    1308                 :         /*
    1309                 :          * if the column in the new tuple or old tuple is null, nothing to do
    1310                 :          */
    1311 GIC           8 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1312               1 :             continue;
    1313                 : 
    1314                 :         /*
    1315                 :          * Unchanged toasted replica identity columns are only logged in the
    1316                 :          * old tuple. Copy this over to the new tuple. The changed (or WAL
    1317                 :          * Logged) toast values are always assembled in memory and set as
    1318                 :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1319                 :          */
    1320               7 :         if (att->attlen == -1 &&
    1321               4 :             VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
    1322               1 :             !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
    1323 ECB             :         {
    1324 GIC           1 :             if (!tmp_new_slot)
    1325 ECB             :             {
    1326 CBC           1 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1327 GIC           1 :                 ExecClearTuple(tmp_new_slot);
    1328 ECB             : 
    1329 GIC           1 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1330               1 :                        desc->natts * sizeof(Datum));
    1331               1 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1332               1 :                        desc->natts * sizeof(bool));
    1333                 :             }
    1334                 : 
    1335 CBC           1 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1336 GIC           1 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1337 ECB             :         }
    1338                 :     }
    1339                 : 
    1340 CBC           4 :     ecxt->ecxt_scantuple = old_slot;
    1341               4 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1342                 : 
    1343 GIC           4 :     if (tmp_new_slot)
    1344                 :     {
    1345               1 :         ExecStoreVirtualTuple(tmp_new_slot);
    1346               1 :         ecxt->ecxt_scantuple = tmp_new_slot;
    1347 ECB             :     }
    1348                 :     else
    1349 CBC           3 :         ecxt->ecxt_scantuple = new_slot;
    1350                 : 
    1351 GIC           4 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1352                 : 
    1353                 :     /*
    1354 ECB             :      * Case 1: if both tuples don't match the row filter, bailout. Send
    1355                 :      * nothing.
    1356                 :      */
    1357 GIC           4 :     if (!old_matched && !new_matched)
    1358 UIC           0 :         return false;
    1359                 : 
    1360                 :     /*
    1361                 :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1362                 :      * tuple does, transform the UPDATE into INSERT.
    1363 ECB             :      *
    1364                 :      * Use the newly transformed tuple that must contain the column values for
    1365                 :      * all the replica identity columns. This is required to ensure that the
    1366                 :      * while inserting the tuple in the downstream node, we have all the
    1367                 :      * required column values.
    1368                 :      */
    1369 CBC           4 :     if (!old_matched && new_matched)
    1370 ECB             :     {
    1371 GIC           2 :         *action = REORDER_BUFFER_CHANGE_INSERT;
    1372 ECB             : 
    1373 CBC           2 :         if (tmp_new_slot)
    1374               1 :             *new_slot_ptr = tmp_new_slot;
    1375 ECB             :     }
    1376                 : 
    1377                 :     /*
    1378                 :      * Case 3: if the old tuple satisfies the row filter but the new tuple
    1379                 :      * doesn't, transform the UPDATE into DELETE.
    1380                 :      *
    1381                 :      * This transformation does not require another tuple. The Old tuple will
    1382                 :      * be used for DELETE.
    1383                 :      */
    1384 CBC           2 :     else if (old_matched && !new_matched)
    1385 GIC           1 :         *action = REORDER_BUFFER_CHANGE_DELETE;
    1386 ECB             : 
    1387                 :     /*
    1388                 :      * Case 4: if both tuples match the row filter, transformation isn't
    1389                 :      * required. (*action is default UPDATE).
    1390                 :      */
    1391                 : 
    1392 CBC           4 :     return true;
    1393                 : }
    1394 ECB             : 
    1395                 : /*
    1396                 :  * Sends the decoded DML over wire.
    1397                 :  *
    1398                 :  * This is called both in streaming and non-streaming modes.
    1399                 :  */
    1400                 : static void
    1401 GBC      183090 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1402                 :                 Relation relation, ReorderBufferChange *change)
    1403                 : {
    1404 GIC      183090 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1405          183090 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1406                 :     MemoryContext old;
    1407                 :     RelationSyncEntry *relentry;
    1408          183090 :     TransactionId xid = InvalidTransactionId;
    1409          183090 :     Relation    ancestor = NULL;
    1410          183090 :     Relation    targetrel = relation;
    1411          183090 :     ReorderBufferChangeType action = change->action;
    1412 CBC      183090 :     TupleTableSlot *old_slot = NULL;
    1413 GIC      183090 :     TupleTableSlot *new_slot = NULL;
    1414 ECB             : 
    1415 CBC      183090 :     if (!is_publishable_relation(relation))
    1416 GIC        1138 :         return;
    1417                 : 
    1418                 :     /*
    1419                 :      * Remember the xid for the change in streaming mode. We need to send xid
    1420                 :      * with each change in the streaming mode so that subscriber can make
    1421                 :      * their association and on aborts, it can discard the corresponding
    1422                 :      * changes.
    1423                 :      */
    1424          183088 :     if (in_streaming)
    1425 CBC      175901 :         xid = change->txn->xid;
    1426 ECB             : 
    1427 GIC      183088 :     relentry = get_rel_sync_entry(data, relation);
    1428                 : 
    1429                 :     /* First check the table filter */
    1430          183085 :     switch (action)
    1431                 :     {
    1432          105738 :         case REORDER_BUFFER_CHANGE_INSERT:
    1433 CBC      105738 :             if (!relentry->pubactions.pubinsert)
    1434 GIC          41 :                 return;
    1435          105697 :             break;
    1436           34446 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1437           34446 :             if (!relentry->pubactions.pubupdate)
    1438              41 :                 return;
    1439           34405 :             break;
    1440           42901 :         case REORDER_BUFFER_CHANGE_DELETE:
    1441           42901 :             if (!relentry->pubactions.pubdelete)
    1442 CBC        1054 :                 return;
    1443                 : 
    1444                 :             /*
    1445                 :              * This is only possible if deletes are allowed even when replica
    1446                 :              * identity is not defined for a table. Since the DELETE action
    1447                 :              * can't be published, we simply return.
    1448                 :              */
    1449 GNC       41847 :             if (!change->data.tp.oldtuple)
    1450                 :             {
    1451 UNC           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1452               0 :                 return;
    1453                 :             }
    1454 GIC       41847 :             break;
    1455 UIC           0 :         default:
    1456 LBC           0 :             Assert(false);
    1457 ECB             :     }
    1458                 : 
    1459                 :     /* Avoid leaking memory by using and resetting our own context */
    1460 CBC      181949 :     old = MemoryContextSwitchTo(data->context);
    1461 ECB             : 
    1462                 :     /* Switch relation if publishing via root. */
    1463 GNC      181949 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
    1464                 :     {
    1465              47 :         Assert(relation->rd_rel->relispartition);
    1466              47 :         ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1467              47 :         targetrel = ancestor;
    1468                 :     }
    1469                 : 
    1470          181949 :     if (change->data.tp.oldtuple)
    1471                 :     {
    1472           41957 :         old_slot = relentry->old_slot;
    1473           41957 :         ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
    1474                 : 
    1475                 :         /* Convert tuple if needed. */
    1476           41957 :         if (relentry->attrmap)
    1477                 :         {
    1478 UNC           0 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1479                 :                                                       &TTSOpsVirtual);
    1480                 : 
    1481               0 :             old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
    1482                 :         }
    1483                 :     }
    1484                 : 
    1485 GNC      181949 :     if (change->data.tp.newtuple)
    1486                 :     {
    1487          140102 :         new_slot = relentry->new_slot;
    1488          140102 :         ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
    1489                 : 
    1490                 :         /* Convert tuple if needed. */
    1491          140102 :         if (relentry->attrmap)
    1492                 :         {
    1493               8 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1494                 :                                                       &TTSOpsVirtual);
    1495                 : 
    1496               8 :             new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
    1497                 :         }
    1498                 :     }
    1499                 : 
    1500                 :     /*
    1501                 :      * Check row filter.
    1502                 :      *
    1503                 :      * Updates could be transformed to inserts or deletes based on the results
    1504                 :      * of the row filter for old and new tuple.
    1505                 :      */
    1506          181949 :     if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
    1507              11 :         goto cleanup;
    1508                 : 
    1509                 :     /*
    1510                 :      * Send BEGIN if we haven't yet.
    1511                 :      *
    1512                 :      * We send the BEGIN message after ensuring that we will actually send the
    1513                 :      * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
    1514                 :      * transactions.
    1515                 :      */
    1516          181938 :     if (txndata && !txndata->sent_begin_txn)
    1517             326 :         pgoutput_send_begin(ctx, txn);
    1518                 : 
    1519                 :     /*
    1520                 :      * Schema should be sent using the original relation because it also sends
    1521                 :      * the ancestor's relation.
    1522                 :      */
    1523          181938 :     maybe_send_schema(ctx, change, relation, relentry);
    1524                 : 
    1525          181938 :     OutputPluginPrepareWrite(ctx, true);
    1526                 : 
    1527 ECB             :     /* Send the data */
    1528 CBC      181938 :     switch (action)
    1529 ECB             :     {
    1530 CBC      105688 :         case REORDER_BUFFER_CHANGE_INSERT:
    1531 GIC      105688 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1532          105688 :                                     data->binary, relentry->columns);
    1533 CBC      105688 :             break;
    1534 GIC       34402 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1535 GNC       34402 :             logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    1536           34402 :                                     new_slot, data->binary, relentry->columns);
    1537 CBC       34402 :             break;
    1538           41848 :         case REORDER_BUFFER_CHANGE_DELETE:
    1539 GNC       41848 :             logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    1540           41848 :                                     data->binary, relentry->columns);
    1541 CBC       41848 :             break;
    1542 UIC           0 :         default:
    1543 LBC           0 :             Assert(false);
    1544                 :     }
    1545 ECB             : 
    1546 GNC      181938 :     OutputPluginWrite(ctx, true);
    1547                 : 
    1548          181948 : cleanup:
    1549 CBC      181948 :     if (RelationIsValid(ancestor))
    1550                 :     {
    1551              47 :         RelationClose(ancestor);
    1552 GBC          47 :         ancestor = NULL;
    1553                 :     }
    1554 ECB             : 
    1555 CBC      181948 :     MemoryContextSwitchTo(old);
    1556          181948 :     MemoryContextReset(data->context);
    1557                 : }
    1558                 : 
    1559                 : static void
    1560 GIC          11 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1561                 :                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1562 ECB             : {
    1563 CBC          11 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1564 GBC          11 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1565                 :     MemoryContext old;
    1566 ECB             :     RelationSyncEntry *relentry;
    1567                 :     int         i;
    1568                 :     int         nrelids;
    1569                 :     Oid        *relids;
    1570 CBC          11 :     TransactionId xid = InvalidTransactionId;
    1571                 : 
    1572                 :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1573              11 :     if (in_streaming)
    1574 UIC           0 :         xid = change->txn->xid;
    1575 ECB             : 
    1576 CBC          11 :     old = MemoryContextSwitchTo(data->context);
    1577                 : 
    1578 GIC          11 :     relids = palloc0(nrelations * sizeof(Oid));
    1579              11 :     nrelids = 0;
    1580 ECB             : 
    1581 CBC          31 :     for (i = 0; i < nrelations; i++)
    1582 ECB             :     {
    1583 GIC          20 :         Relation    relation = relations[i];
    1584              20 :         Oid         relid = RelationGetRelid(relation);
    1585 ECB             : 
    1586 CBC          20 :         if (!is_publishable_relation(relation))
    1587 LBC           0 :             continue;
    1588                 : 
    1589 GIC          20 :         relentry = get_rel_sync_entry(data, relation);
    1590 ECB             : 
    1591 GIC          20 :         if (!relentry->pubactions.pubtruncate)
    1592               8 :             continue;
    1593                 : 
    1594 ECB             :         /*
    1595                 :          * Don't send partitions if the publication wants to send only the
    1596                 :          * root tables through it.
    1597                 :          */
    1598 CBC          12 :         if (relation->rd_rel->relispartition &&
    1599 GIC          10 :             relentry->publish_as_relid != relid)
    1600 UIC           0 :             continue;
    1601                 : 
    1602 GIC          12 :         relids[nrelids++] = relid;
    1603                 : 
    1604 ECB             :         /* Send BEGIN if we haven't yet */
    1605 GBC          12 :         if (txndata && !txndata->sent_begin_txn)
    1606 GIC           7 :             pgoutput_send_begin(ctx, txn);
    1607                 : 
    1608              12 :         maybe_send_schema(ctx, change, relation, relentry);
    1609                 :     }
    1610 ECB             : 
    1611 GIC          11 :     if (nrelids > 0)
    1612 ECB             :     {
    1613 GIC           7 :         OutputPluginPrepareWrite(ctx, true);
    1614               7 :         logicalrep_write_truncate(ctx->out,
    1615 ECB             :                                   xid,
    1616                 :                                   nrelids,
    1617                 :                                   relids,
    1618 GIC           7 :                                   change->data.truncate.cascade,
    1619 CBC           7 :                                   change->data.truncate.restart_seqs);
    1620               7 :         OutputPluginWrite(ctx, true);
    1621                 :     }
    1622                 : 
    1623 GIC          11 :     MemoryContextSwitchTo(old);
    1624              11 :     MemoryContextReset(data->context);
    1625              11 : }
    1626                 : 
    1627 ECB             : static void
    1628 GIC           6 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1629                 :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1630                 :                  const char *message)
    1631                 : {
    1632               6 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1633               6 :     TransactionId xid = InvalidTransactionId;
    1634                 : 
    1635               6 :     if (!data->messages)
    1636 CBC           1 :         return;
    1637 ECB             : 
    1638                 :     /*
    1639                 :      * Remember the xid for the message in streaming mode. See
    1640                 :      * pgoutput_change.
    1641                 :      */
    1642 GIC           5 :     if (in_streaming)
    1643 UIC           0 :         xid = txn->xid;
    1644                 : 
    1645                 :     /*
    1646                 :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1647                 :      */
    1648 GIC           5 :     if (transactional)
    1649                 :     {
    1650 CBC           2 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1651                 : 
    1652 ECB             :         /* Send BEGIN if we haven't yet */
    1653 GIC           2 :         if (txndata && !txndata->sent_begin_txn)
    1654 CBC           2 :             pgoutput_send_begin(ctx, txn);
    1655 ECB             :     }
    1656                 : 
    1657 CBC           5 :     OutputPluginPrepareWrite(ctx, true);
    1658 GIC           5 :     logicalrep_write_message(ctx->out,
    1659                 :                              xid,
    1660                 :                              message_lsn,
    1661                 :                              transactional,
    1662                 :                              prefix,
    1663 ECB             :                              sz,
    1664                 :                              message);
    1665 CBC           5 :     OutputPluginWrite(ctx, true);
    1666                 : }
    1667                 : 
    1668 ECB             : /*
    1669                 :  * Return true if the data is associated with an origin and the user has
    1670                 :  * requested the changes that don't have an origin, false otherwise.
    1671                 :  */
    1672                 : static bool
    1673 GIC      490622 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1674 ECB             :                        RepOriginId origin_id)
    1675                 : {
    1676 GNC      490622 :     if (publish_no_origin && origin_id != InvalidRepOriginId)
    1677              46 :         return true;
    1678                 : 
    1679 GIC      490576 :     return false;
    1680 ECB             : }
    1681                 : 
    1682                 : /*
    1683                 :  * Shutdown the output plugin.
    1684                 :  *
    1685                 :  * Note, we don't need to clean the data->context and data->cachectx as
    1686                 :  * they are child contexts of the ctx->context so they will be cleaned up by
    1687                 :  * logical decoding machinery.
    1688                 :  */
    1689                 : static void
    1690 GIC         384 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1691 ECB             : {
    1692 GIC         384 :     if (RelationSyncCache)
    1693                 :     {
    1694             154 :         hash_destroy(RelationSyncCache);
    1695             154 :         RelationSyncCache = NULL;
    1696                 :     }
    1697 CBC         384 : }
    1698 ECB             : 
    1699                 : /*
    1700                 :  * Load publications from the list of publication names.
    1701                 :  */
    1702                 : static List *
    1703 GIC         128 : LoadPublications(List *pubnames)
    1704 ECB             : {
    1705 GIC         128 :     List       *result = NIL;
    1706                 :     ListCell   *lc;
    1707 ECB             : 
    1708 GIC         297 :     foreach(lc, pubnames)
    1709                 :     {
    1710 CBC         171 :         char       *pubname = (char *) lfirst(lc);
    1711 GIC         171 :         Publication *pub = GetPublicationByName(pubname, false);
    1712                 : 
    1713             169 :         result = lappend(result, pub);
    1714                 :     }
    1715                 : 
    1716 CBC         126 :     return result;
    1717 ECB             : }
    1718                 : 
    1719                 : /*
    1720                 :  * Publication syscache invalidation callback.
    1721                 :  *
    1722                 :  * Called for invalidations on pg_publication.
    1723                 :  */
    1724                 : static void
    1725 CBC         222 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1726                 : {
    1727 GIC         222 :     publications_valid = false;
    1728 ECB             : 
    1729                 :     /*
    1730                 :      * Also invalidate per-relation cache so that next time the filtering info
    1731                 :      * is checked it will be updated with the new publication settings.
    1732                 :      */
    1733 GIC         222 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
    1734             222 : }
    1735 ECB             : 
    1736                 : /*
    1737                 :  * START STREAM callback
    1738                 :  */
    1739                 : static void
    1740 GIC         595 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1741 ECB             :                       ReorderBufferTXN *txn)
    1742                 : {
    1743 CBC         595 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1744                 : 
    1745                 :     /* we can't nest streaming of transactions */
    1746             595 :     Assert(!in_streaming);
    1747 ECB             : 
    1748                 :     /*
    1749                 :      * If we already sent the first stream for this transaction then don't
    1750                 :      * send the origin id in the subsequent streams.
    1751                 :      */
    1752 GIC         595 :     if (rbtxn_is_streamed(txn))
    1753             540 :         send_replication_origin = false;
    1754 ECB             : 
    1755 GIC         595 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1756             595 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1757                 : 
    1758             595 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1759 ECB             :                      send_replication_origin);
    1760                 : 
    1761 GIC         595 :     OutputPluginWrite(ctx, true);
    1762                 : 
    1763                 :     /* we're streaming a chunk of transaction now */
    1764             595 :     in_streaming = true;
    1765             595 : }
    1766 ECB             : 
    1767                 : /*
    1768                 :  * STOP STREAM callback
    1769                 :  */
    1770                 : static void
    1771 CBC         595 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1772                 :                      ReorderBufferTXN *txn)
    1773 ECB             : {
    1774                 :     /* we should be streaming a trasanction */
    1775 GIC         595 :     Assert(in_streaming);
    1776                 : 
    1777 CBC         595 :     OutputPluginPrepareWrite(ctx, true);
    1778 GIC         595 :     logicalrep_write_stream_stop(ctx->out);
    1779 CBC         595 :     OutputPluginWrite(ctx, true);
    1780 ECB             : 
    1781                 :     /* we've stopped streaming a transaction */
    1782 GIC         595 :     in_streaming = false;
    1783             595 : }
    1784                 : 
    1785                 : /*
    1786                 :  * Notify downstream to discard the streamed transaction (along with all
    1787 ECB             :  * it's subtransactions, if it's a toplevel transaction).
    1788                 :  */
    1789                 : static void
    1790 GIC          26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1791                 :                       ReorderBufferTXN *txn,
    1792                 :                       XLogRecPtr abort_lsn)
    1793                 : {
    1794                 :     ReorderBufferTXN *toptxn;
    1795 GNC          26 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1796              26 :     bool        write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
    1797 ECB             : 
    1798                 :     /*
    1799                 :      * The abort should happen outside streaming block, even for streamed
    1800                 :      * transactions. The transaction has to be marked as streamed, though.
    1801                 :      */
    1802 CBC          26 :     Assert(!in_streaming);
    1803 ECB             : 
    1804                 :     /* determine the toplevel transaction */
    1805 GNC          26 :     toptxn = rbtxn_get_toptxn(txn);
    1806 ECB             : 
    1807 CBC          26 :     Assert(rbtxn_is_streamed(toptxn));
    1808                 : 
    1809 GIC          26 :     OutputPluginPrepareWrite(ctx, true);
    1810 GNC          26 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
    1811                 :                                   txn->xact_time.abort_time, write_abort_info);
    1812                 : 
    1813 GIC          26 :     OutputPluginWrite(ctx, true);
    1814                 : 
    1815              26 :     cleanup_rel_sync_cache(toptxn->xid, false);
    1816              26 : }
    1817 ECB             : 
    1818                 : /*
    1819                 :  * Notify downstream to apply the streamed transaction (along with all
    1820                 :  * it's subtransactions).
    1821                 :  */
    1822                 : static void
    1823 CBC          44 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1824 ECB             :                        ReorderBufferTXN *txn,
    1825                 :                        XLogRecPtr commit_lsn)
    1826                 : {
    1827                 :     /*
    1828                 :      * The commit should happen outside streaming block, even for streamed
    1829                 :      * transactions. The transaction has to be marked as streamed, though.
    1830                 :      */
    1831 GIC          44 :     Assert(!in_streaming);
    1832              44 :     Assert(rbtxn_is_streamed(txn));
    1833                 : 
    1834 GNC          44 :     OutputPluginUpdateProgress(ctx, false);
    1835                 : 
    1836 GIC          44 :     OutputPluginPrepareWrite(ctx, true);
    1837 CBC          44 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1838 GIC          44 :     OutputPluginWrite(ctx, true);
    1839                 : 
    1840              44 :     cleanup_rel_sync_cache(txn->xid, true);
    1841              44 : }
    1842                 : 
    1843 ECB             : /*
    1844 EUB             :  * PREPARE callback (for streaming two-phase commit).
    1845                 :  *
    1846                 :  * Notify the downstream to prepare the transaction.
    1847 ECB             :  */
    1848                 : static void
    1849 CBC           8 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1850                 :                             ReorderBufferTXN *txn,
    1851 ECB             :                             XLogRecPtr prepare_lsn)
    1852                 : {
    1853 GIC           8 :     Assert(rbtxn_is_streamed(txn));
    1854                 : 
    1855 GNC           8 :     OutputPluginUpdateProgress(ctx, false);
    1856 GIC           8 :     OutputPluginPrepareWrite(ctx, true);
    1857               8 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1858 CBC           8 :     OutputPluginWrite(ctx, true);
    1859 GBC           8 : }
    1860                 : 
    1861                 : /*
    1862 ECB             :  * Initialize the relation schema sync cache for a decoding session.
    1863                 :  *
    1864                 :  * The hash table is destroyed at the end of a decoding session. While
    1865                 :  * relcache invalidations still exist and will still be invoked, they
    1866                 :  * will just see the null hash table global and take no action.
    1867                 :  */
    1868                 : static void
    1869 GIC         281 : init_rel_sync_cache(MemoryContext cachectx)
    1870                 : {
    1871                 :     HASHCTL     ctl;
    1872                 :     static bool relation_callbacks_registered = false;
    1873                 : 
    1874                 :     /* Nothing to do if hash table already exists */
    1875             281 :     if (RelationSyncCache != NULL)
    1876 UIC           0 :         return;
    1877 ECB             : 
    1878                 :     /* Make a new hash table for the cache */
    1879 GIC         281 :     ctl.keysize = sizeof(Oid);
    1880 CBC         281 :     ctl.entrysize = sizeof(RelationSyncEntry);
    1881 GIC         281 :     ctl.hcxt = cachectx;
    1882                 : 
    1883             281 :     RelationSyncCache = hash_create("logical replication output relation cache",
    1884 ECB             :                                     128, &ctl,
    1885                 :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1886                 : 
    1887 GIC         281 :     Assert(RelationSyncCache != NULL);
    1888                 : 
    1889                 :     /* No more to do if we already registered callbacks */
    1890             281 :     if (relation_callbacks_registered)
    1891 LBC           0 :         return;
    1892                 : 
    1893                 :     /* We must update the cache entry for a relation after a relcache flush */
    1894 CBC         281 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    1895                 : 
    1896                 :     /*
    1897                 :      * Flush all cache entries after a pg_namespace change, in case it was a
    1898                 :      * schema rename affecting a relation being replicated.
    1899                 :      */
    1900 GNC         281 :     CacheRegisterSyscacheCallback(NAMESPACEOID,
    1901                 :                                   rel_sync_cache_publication_cb,
    1902                 :                                   (Datum) 0);
    1903                 : 
    1904                 :     /*
    1905                 :      * Flush all cache entries after any publication changes.  (We need no
    1906                 :      * callback entry for pg_publication, because publication_invalidation_cb
    1907                 :      * will take care of it.)
    1908                 :      */
    1909 GIC         281 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
    1910                 :                                   rel_sync_cache_publication_cb,
    1911                 :                                   (Datum) 0);
    1912             281 :     CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
    1913                 :                                   rel_sync_cache_publication_cb,
    1914                 :                                   (Datum) 0);
    1915                 : 
    1916 CBC         281 :     relation_callbacks_registered = true;
    1917                 : }
    1918                 : 
    1919                 : /*
    1920 ECB             :  * We expect relatively small number of streamed transactions.
    1921                 :  */
    1922                 : static bool
    1923 GIC      175901 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1924 ECB             : {
    1925 GNC      175901 :     return list_member_xid(entry->streamed_txns, xid);
    1926                 : }
    1927                 : 
    1928                 : /*
    1929 ECB             :  * Add the xid in the rel sync entry for which we have already sent the schema
    1930                 :  * of the relation.
    1931                 :  */
    1932                 : static void
    1933 GIC          64 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1934 ECB             : {
    1935                 :     MemoryContext oldctx;
    1936                 : 
    1937 GIC          64 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1938                 : 
    1939 GNC          64 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    1940                 : 
    1941 GIC          64 :     MemoryContextSwitchTo(oldctx);
    1942 CBC          64 : }
    1943                 : 
    1944                 : /*
    1945 ECB             :  * Find or create entry in the relation schema cache.
    1946                 :  *
    1947                 :  * This looks up publications that the given relation is directly or
    1948                 :  * indirectly part of (the latter if it's really the relation's ancestor that
    1949                 :  * is part of a publication) and fills up the found entry with the information
    1950                 :  * about which operations to publish and whether to use an ancestor's schema
    1951                 :  * when publishing.
    1952                 :  */
    1953                 : static RelationSyncEntry *
    1954 CBC      183108 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    1955 ECB             : {
    1956                 :     RelationSyncEntry *entry;
    1957                 :     bool        found;
    1958                 :     MemoryContext oldctx;
    1959 GIC      183108 :     Oid         relid = RelationGetRelid(relation);
    1960                 : 
    1961          183108 :     Assert(RelationSyncCache != NULL);
    1962 ECB             : 
    1963                 :     /* Find cached relation info, creating if not found */
    1964 CBC      183108 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    1965                 :                                               &relid,
    1966                 :                                               HASH_ENTER, &found);
    1967 GIC      183108 :     Assert(entry != NULL);
    1968                 : 
    1969                 :     /* initialize entry, if it's new */
    1970          183108 :     if (!found)
    1971                 :     {
    1972 CBC         207 :         entry->replicate_valid = false;
    1973 GIC         207 :         entry->schema_sent = false;
    1974 CBC         207 :         entry->streamed_txns = NIL;
    1975             207 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    1976             207 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    1977             207 :         entry->new_slot = NULL;
    1978             207 :         entry->old_slot = NULL;
    1979 GIC         207 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    1980             207 :         entry->entry_cxt = NULL;
    1981 CBC         207 :         entry->publish_as_relid = InvalidOid;
    1982 GIC         207 :         entry->columns = NULL;
    1983 CBC         207 :         entry->attrmap = NULL;
    1984 ECB             :     }
    1985                 : 
    1986                 :     /* Validate the entry */
    1987 CBC      183108 :     if (!entry->replicate_valid)
    1988                 :     {
    1989             260 :         Oid         schemaId = get_rel_namespace(relid);
    1990             260 :         List       *pubids = GetRelationPublications(relid);
    1991 ECB             : 
    1992                 :         /*
    1993                 :          * We don't acquire a lock on the namespace system table as we build
    1994                 :          * the cache entry using a historic snapshot and all the later changes
    1995                 :          * are absorbed while decoding WAL.
    1996                 :          */
    1997 GIC         260 :         List       *schemaPubids = GetSchemaPublications(schemaId);
    1998                 :         ListCell   *lc;
    1999             260 :         Oid         publish_as_relid = relid;
    2000 CBC         260 :         int         publish_ancestor_level = 0;
    2001             260 :         bool        am_partition = get_rel_relispartition(relid);
    2002             260 :         char        relkind = get_rel_relkind(relid);
    2003             260 :         List       *rel_publications = NIL;
    2004 ECB             : 
    2005                 :         /* Reload publications if needed before use. */
    2006 CBC         260 :         if (!publications_valid)
    2007 ECB             :         {
    2008 CBC         128 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    2009 GIC         128 :             if (data->publications)
    2010                 :             {
    2011              19 :                 list_free_deep(data->publications);
    2012              19 :                 data->publications = NIL;
    2013 ECB             :             }
    2014 CBC         128 :             data->publications = LoadPublications(data->publication_names);
    2015             126 :             MemoryContextSwitchTo(oldctx);
    2016             126 :             publications_valid = true;
    2017                 :         }
    2018 ECB             : 
    2019                 :         /*
    2020                 :          * Reset schema_sent status as the relation definition may have
    2021                 :          * changed.  Also reset pubactions to empty in case rel was dropped
    2022 EUB             :          * from a publication.  Also free any objects that depended on the
    2023 ECB             :          * earlier definition.
    2024                 :          */
    2025 GIC         258 :         entry->schema_sent = false;
    2026             258 :         list_free(entry->streamed_txns);
    2027             258 :         entry->streamed_txns = NIL;
    2028 CBC         258 :         bms_free(entry->columns);
    2029             258 :         entry->columns = NULL;
    2030 GIC         258 :         entry->pubactions.pubinsert = false;
    2031 CBC         258 :         entry->pubactions.pubupdate = false;
    2032             258 :         entry->pubactions.pubdelete = false;
    2033             258 :         entry->pubactions.pubtruncate = false;
    2034                 : 
    2035                 :         /*
    2036                 :          * Tuple slots cleanups. (Will be rebuilt later if needed).
    2037                 :          */
    2038 GIC         258 :         if (entry->old_slot)
    2039              49 :             ExecDropSingleTupleTableSlot(entry->old_slot);
    2040             258 :         if (entry->new_slot)
    2041 CBC          49 :             ExecDropSingleTupleTableSlot(entry->new_slot);
    2042                 : 
    2043             258 :         entry->old_slot = NULL;
    2044             258 :         entry->new_slot = NULL;
    2045                 : 
    2046 GIC         258 :         if (entry->attrmap)
    2047 UIC           0 :             free_attrmap(entry->attrmap);
    2048 GIC         258 :         entry->attrmap = NULL;
    2049                 : 
    2050                 :         /*
    2051 ECB             :          * Row filter cache cleanups.
    2052                 :          */
    2053 GIC         258 :         if (entry->entry_cxt)
    2054              10 :             MemoryContextDelete(entry->entry_cxt);
    2055                 : 
    2056             258 :         entry->entry_cxt = NULL;
    2057             258 :         entry->estate = NULL;
    2058 CBC         258 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2059                 : 
    2060 ECB             :         /*
    2061                 :          * Build publication cache. We can't use one provided by relcache as
    2062                 :          * relcache considers all publications that the given relation is in,
    2063                 :          * but here we only need to consider ones that the subscriber
    2064                 :          * requested.
    2065                 :          */
    2066 CBC         663 :         foreach(lc, data->publications)
    2067                 :         {
    2068 GIC         405 :             Publication *pub = lfirst(lc);
    2069             405 :             bool        publish = false;
    2070 ECB             : 
    2071                 :             /*
    2072                 :              * Under what relid should we publish changes in this publication?
    2073                 :              * We'll use the top-most relid across all publications. Also
    2074                 :              * track the ancestor level for this publication.
    2075                 :              */
    2076 GIC         405 :             Oid         pub_relid = relid;
    2077             405 :             int         ancestor_level = 0;
    2078                 : 
    2079                 :             /*
    2080 ECB             :              * If this is a FOR ALL TABLES publication, pick the partition
    2081                 :              * root and set the ancestor level accordingly.
    2082                 :              */
    2083 GIC         405 :             if (pub->alltables)
    2084 ECB             :             {
    2085 GIC          62 :                 publish = true;
    2086 CBC          62 :                 if (pub->pubviaroot && am_partition)
    2087                 :                 {
    2088 GIC          10 :                     List       *ancestors = get_partition_ancestors(relid);
    2089                 : 
    2090 CBC          10 :                     pub_relid = llast_oid(ancestors);
    2091 GIC          10 :                     ancestor_level = list_length(ancestors);
    2092 ECB             :                 }
    2093                 :             }
    2094                 : 
    2095 CBC         405 :             if (!publish)
    2096 ECB             :             {
    2097 GIC         343 :                 bool        ancestor_published = false;
    2098                 : 
    2099                 :                 /*
    2100                 :                  * For a partition, check if any of the ancestors are
    2101 ECB             :                  * published.  If so, note down the topmost ancestor that is
    2102                 :                  * published via this publication, which will be used as the
    2103                 :                  * relation via which to publish the partition's changes.
    2104                 :                  */
    2105 GIC         343 :                 if (am_partition)
    2106                 :                 {
    2107                 :                     Oid         ancestor;
    2108                 :                     int         level;
    2109              99 :                     List       *ancestors = get_partition_ancestors(relid);
    2110                 : 
    2111              99 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
    2112                 :                                                                ancestors,
    2113                 :                                                                &level);
    2114                 : 
    2115 CBC          99 :                     if (ancestor != InvalidOid)
    2116 ECB             :                     {
    2117 GIC          40 :                         ancestor_published = true;
    2118 CBC          40 :                         if (pub->pubviaroot)
    2119 ECB             :                         {
    2120 CBC          17 :                             pub_relid = ancestor;
    2121              17 :                             ancestor_level = level;
    2122                 :                         }
    2123                 :                     }
    2124                 :                 }
    2125                 : 
    2126 GIC         540 :                 if (list_member_oid(pubids, pub->oid) ||
    2127             388 :                     list_member_oid(schemaPubids, pub->oid) ||
    2128                 :                     ancestor_published)
    2129             177 :                     publish = true;
    2130 ECB             :             }
    2131                 : 
    2132                 :             /*
    2133                 :              * If the relation is to be published, determine actions to
    2134                 :              * publish, and list of columns, if appropriate.
    2135                 :              *
    2136                 :              * Don't publish changes for partitioned tables, because
    2137                 :              * publishing those of its partitions suffices, unless partition
    2138                 :              * changes won't be published due to pubviaroot being set.
    2139                 :              */
    2140 CBC         405 :             if (publish &&
    2141               3 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2142                 :             {
    2143 GIC         236 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2144 CBC         236 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2145 GIC         236 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2146             236 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2147                 : 
    2148                 :                 /*
    2149 ECB             :                  * We want to publish the changes as the top-most ancestor
    2150                 :                  * across all publications. So we need to check if the already
    2151                 :                  * calculated level is higher than the new one. If yes, we can
    2152                 :                  * ignore the new value (as it's a child). Otherwise the new
    2153                 :                  * value is an ancestor, so we keep it.
    2154                 :                  */
    2155 GIC         236 :                 if (publish_ancestor_level > ancestor_level)
    2156               1 :                     continue;
    2157 ECB             : 
    2158                 :                 /*
    2159                 :                  * If we found an ancestor higher up in the tree, discard the
    2160                 :                  * list of publications through which we replicate it, and use
    2161                 :                  * the new ancestor.
    2162                 :                  */
    2163 CBC         235 :                 if (publish_ancestor_level < ancestor_level)
    2164 ECB             :                 {
    2165 GIC          27 :                     publish_as_relid = pub_relid;
    2166              27 :                     publish_ancestor_level = ancestor_level;
    2167 ECB             : 
    2168                 :                     /* reset the publication list for this relation */
    2169 GIC          27 :                     rel_publications = NIL;
    2170 ECB             :                 }
    2171                 :                 else
    2172                 :                 {
    2173                 :                     /* Same ancestor level, has to be the same OID. */
    2174 GIC         208 :                     Assert(publish_as_relid == pub_relid);
    2175                 :                 }
    2176 ECB             : 
    2177                 :                 /* Track publications for this ancestor. */
    2178 CBC         235 :                 rel_publications = lappend(rel_publications, pub);
    2179                 :             }
    2180 ECB             :         }
    2181                 : 
    2182 GIC         258 :         entry->publish_as_relid = publish_as_relid;
    2183 ECB             : 
    2184                 :         /*
    2185                 :          * Initialize the tuple slot, map, and row filter. These are only used
    2186                 :          * when publishing inserts, updates, or deletes.
    2187                 :          */
    2188 GIC         258 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2189              36 :             entry->pubactions.pubdelete)
    2190                 :         {
    2191                 :             /* Initialize the tuple slot and map */
    2192             222 :             init_tuple_slot(data, relation, entry);
    2193                 : 
    2194                 :             /* Initialize the row filter */
    2195             222 :             pgoutput_row_filter_init(data, rel_publications, entry);
    2196                 : 
    2197                 :             /* Initialize the column list */
    2198 CBC         222 :             pgoutput_column_list_init(data, rel_publications, entry);
    2199                 :         }
    2200                 : 
    2201 GIC         257 :         list_free(pubids);
    2202             257 :         list_free(schemaPubids);
    2203             257 :         list_free(rel_publications);
    2204 ECB             : 
    2205 GIC         257 :         entry->replicate_valid = true;
    2206 ECB             :     }
    2207                 : 
    2208 GIC      183105 :     return entry;
    2209                 : }
    2210                 : 
    2211                 : /*
    2212                 :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2213                 :  *
    2214                 :  * When a streamed transaction commits or aborts, we need to remove the
    2215 ECB             :  * toplevel XID from the schema cache. If the transaction aborted, the
    2216                 :  * subscriber will simply throw away the schema records we streamed, so
    2217                 :  * we don't need to do anything else.
    2218                 :  *
    2219                 :  * If the transaction is committed, the subscriber will update the relation
    2220                 :  * cache - so tweak the schema_sent flag accordingly.
    2221                 :  */
    2222                 : static void
    2223 CBC          70 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2224 ECB             : {
    2225                 :     HASH_SEQ_STATUS hash_seq;
    2226                 :     RelationSyncEntry *entry;
    2227                 :     ListCell   *lc;
    2228                 : 
    2229 GIC          70 :     Assert(RelationSyncCache != NULL);
    2230                 : 
    2231              70 :     hash_seq_init(&hash_seq, RelationSyncCache);
    2232             143 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2233                 :     {
    2234 ECB             :         /*
    2235                 :          * We can set the schema_sent flag for an entry that has committed xid
    2236                 :          * in the list as that ensures that the subscriber would have the
    2237                 :          * corresponding schema and we don't need to send it unless there is
    2238                 :          * any invalidation for that relation.
    2239                 :          */
    2240 GIC          91 :         foreach(lc, entry->streamed_txns)
    2241                 :         {
    2242 GNC          70 :             if (xid == lfirst_xid(lc))
    2243 ECB             :             {
    2244 CBC          52 :                 if (is_commit)
    2245 GIC          41 :                     entry->schema_sent = true;
    2246                 : 
    2247              52 :                 entry->streamed_txns =
    2248              52 :                     foreach_delete_current(entry->streamed_txns, lc);
    2249              52 :                 break;
    2250                 :             }
    2251                 :         }
    2252                 :     }
    2253              70 : }
    2254 ECB             : 
    2255                 : /*
    2256                 :  * Relcache invalidation callback
    2257                 :  */
    2258                 : static void
    2259 GIC        3135 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2260 ECB             : {
    2261                 :     RelationSyncEntry *entry;
    2262                 : 
    2263                 :     /*
    2264                 :      * We can get here if the plugin was used in SQL interface as the
    2265                 :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
    2266                 :      * is no way to unregister the relcache invalidation callback.
    2267                 :      */
    2268 GIC        3135 :     if (RelationSyncCache == NULL)
    2269               6 :         return;
    2270 ECB             : 
    2271                 :     /*
    2272                 :      * Nobody keeps pointers to entries in this hash table around outside
    2273                 :      * logical decoding callback calls - but invalidation events can come in
    2274                 :      * *during* a callback if we do any syscache access in the callback.
    2275                 :      * Because of that we must mark the cache entry as invalid but not damage
    2276                 :      * any of its substructure here.  The next get_rel_sync_entry() call will
    2277                 :      * rebuild it all.
    2278                 :      */
    2279 GIC        3129 :     if (OidIsValid(relid))
    2280                 :     {
    2281                 :         /*
    2282                 :          * Getting invalidations for relations that aren't in the table is
    2283                 :          * entirely normal.  So we don't care if it's found or not.
    2284                 :          */
    2285 CBC        3106 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2286                 :                                                   HASH_FIND, NULL);
    2287 GIC        3106 :         if (entry != NULL)
    2288             445 :             entry->replicate_valid = false;
    2289                 :     }
    2290                 :     else
    2291                 :     {
    2292                 :         /* Whole cache must be flushed. */
    2293                 :         HASH_SEQ_STATUS status;
    2294                 : 
    2295 CBC          23 :         hash_seq_init(&status, RelationSyncCache);
    2296              53 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2297                 :         {
    2298 GIC          30 :             entry->replicate_valid = false;
    2299                 :         }
    2300                 :     }
    2301                 : }
    2302 ECB             : 
    2303                 : /*
    2304                 :  * Publication relation/schema map syscache invalidation callback
    2305                 :  *
    2306                 :  * Called for invalidations on pg_publication, pg_publication_rel,
    2307                 :  * pg_publication_namespace, and pg_namespace.
    2308                 :  */
    2309                 : static void
    2310 GIC         638 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2311 ECB             : {
    2312                 :     HASH_SEQ_STATUS status;
    2313                 :     RelationSyncEntry *entry;
    2314                 : 
    2315                 :     /*
    2316                 :      * We can get here if the plugin was used in SQL interface as the
    2317                 :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
    2318                 :      * is no way to unregister the invalidation callbacks.
    2319                 :      */
    2320 GIC         638 :     if (RelationSyncCache == NULL)
    2321              24 :         return;
    2322                 : 
    2323                 :     /*
    2324                 :      * We have no easy way to identify which cache entries this invalidation
    2325                 :      * event might have affected, so just mark them all invalid.
    2326                 :      */
    2327             614 :     hash_seq_init(&status, RelationSyncCache);
    2328 CBC        1819 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2329                 :     {
    2330 GIC        1205 :         entry->replicate_valid = false;
    2331 ECB             :     }
    2332                 : }
    2333                 : 
    2334                 : /* Send Replication origin */
    2335                 : static void
    2336 GIC         948 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2337 ECB             :                  XLogRecPtr origin_lsn, bool send_origin)
    2338                 : {
    2339 GIC         948 :     if (send_origin)
    2340                 :     {
    2341                 :         char       *origin;
    2342                 : 
    2343                 :         /*----------
    2344                 :          * XXX: which behaviour do we want here?
    2345                 :          *
    2346                 :          * Alternatives:
    2347                 :          *  - don't send origin message if origin name not found
    2348                 :          *    (that's what we do now)
    2349                 :          *  - throw error - that will break replication, not good
    2350                 :          *  - send some special "unknown" origin
    2351                 :          *----------
    2352                 :          */
    2353               7 :         if (replorigin_by_oid(origin_id, true, &origin))
    2354                 :         {
    2355                 :             /* Message boundary */
    2356               7 :             OutputPluginWrite(ctx, false);
    2357               7 :             OutputPluginPrepareWrite(ctx, true);
    2358                 : 
    2359               7 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2360                 :         }
    2361                 :     }
    2362             948 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a