LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 93.5 % 1596 1493 392 17 43 3 17 740 388 348 52 717 6 84
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 81 81 62 19 76 5
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  * worker.c
       3                 :  *     PostgreSQL logical replication worker (apply)
       4                 :  *
       5                 :  * Copyright (c) 2016-2023, PostgreSQL Global Development Group
       6                 :  *
       7                 :  * IDENTIFICATION
       8                 :  *    src/backend/replication/logical/worker.c
       9                 :  *
      10                 :  * NOTES
      11                 :  *    This file contains the worker which applies logical changes as they come
      12                 :  *    from remote logical replication stream.
      13                 :  *
      14                 :  *    The main worker (apply) is started by logical replication worker
      15                 :  *    launcher for every enabled subscription in a database. It uses
      16                 :  *    walsender protocol to communicate with publisher.
      17                 :  *
      18                 :  *    This module includes server facing code and shares libpqwalreceiver
      19                 :  *    module with walreceiver for providing the libpq specific functionality.
      20                 :  *
      21                 :  *
      22                 :  * STREAMED TRANSACTIONS
      23                 :  * ---------------------
      24                 :  * Streamed transactions (large transactions exceeding a memory limit on the
      25                 :  * upstream) are applied using one of two approaches:
      26                 :  *
      27                 :  * 1) Write to temporary files and apply when the final commit arrives
      28                 :  *
      29                 :  * This approach is used when the user has set the subscription's streaming
      30                 :  * option as on.
      31                 :  *
      32                 :  * Unlike the regular (non-streamed) case, handling streamed transactions has
      33                 :  * to handle aborts of both the toplevel transaction and subtransactions. This
      34                 :  * is achieved by tracking offsets for subtransactions, which is then used
      35                 :  * to truncate the file with serialized changes.
      36                 :  *
      37                 :  * The files are placed in tmp file directory by default, and the filenames
      38                 :  * include both the XID of the toplevel transaction and OID of the
      39                 :  * subscription. This is necessary so that different workers processing a
      40                 :  * remote transaction with the same XID doesn't interfere.
      41                 :  *
      42                 :  * We use BufFiles instead of using normal temporary files because (a) the
      43                 :  * BufFile infrastructure supports temporary files that exceed the OS file size
      44                 :  * limit, (b) provides a way for automatic clean up on the error and (c) provides
      45                 :  * a way to survive these files across local transactions and allow to open and
      46                 :  * close at stream start and close. We decided to use FileSet
      47                 :  * infrastructure as without that it deletes the files on the closure of the
      48                 :  * file and if we decide to keep stream files open across the start/stop stream
      49                 :  * then it will consume a lot of memory (more than 8K for each BufFile and
      50                 :  * there could be multiple such BufFiles as the subscriber could receive
      51                 :  * multiple start/stop streams for different transactions before getting the
      52                 :  * commit). Moreover, if we don't use FileSet then we also need to invent
      53                 :  * a new way to pass filenames to BufFile APIs so that we are allowed to open
      54                 :  * the file we desired across multiple stream-open calls for the same
      55                 :  * transaction.
      56                 :  *
      57                 :  * 2) Parallel apply workers.
      58                 :  *
      59                 :  * This approach is used when the user has set the subscription's streaming
      60                 :  * option as parallel. See logical/applyparallelworker.c for information about
      61                 :  * this approach.
      62                 :  *
      63                 :  * TWO_PHASE TRANSACTIONS
      64                 :  * ----------------------
      65                 :  * Two phase transactions are replayed at prepare and then committed or
      66                 :  * rolled back at commit prepared and rollback prepared respectively. It is
      67                 :  * possible to have a prepared transaction that arrives at the apply worker
      68                 :  * when the tablesync is busy doing the initial copy. In this case, the apply
      69                 :  * worker skips all the prepared operations [e.g. inserts] while the tablesync
      70                 :  * is still busy (see the condition of should_apply_changes_for_rel). The
      71                 :  * tablesync worker might not get such a prepared transaction because say it
      72                 :  * was prior to the initial consistent point but might have got some later
      73                 :  * commits. Now, the tablesync worker will exit without doing anything for the
      74                 :  * prepared transaction skipped by the apply worker as the sync location for it
      75                 :  * will be already ahead of the apply worker's current location. This would lead
      76                 :  * to an "empty prepare", because later when the apply worker does the commit
      77                 :  * prepare, there is nothing in it (the inserts were skipped earlier).
      78                 :  *
      79                 :  * To avoid this, and similar prepare confusions the subscription's two_phase
      80                 :  * commit is enabled only after the initial sync is over. The two_phase option
      81                 :  * has been implemented as a tri-state with values DISABLED, PENDING, and
      82                 :  * ENABLED.
      83                 :  *
      84                 :  * Even if the user specifies they want a subscription with two_phase = on,
      85                 :  * internally it will start with a tri-state of PENDING which only becomes
      86                 :  * ENABLED after all tablesync initializations are completed - i.e. when all
      87                 :  * tablesync workers have reached their READY state. In other words, the value
      88                 :  * PENDING is only a temporary state for subscription start-up.
      89                 :  *
      90                 :  * Until the two_phase is properly available (ENABLED) the subscription will
      91                 :  * behave as if two_phase = off. When the apply worker detects that all
      92                 :  * tablesyncs have become READY (while the tri-state was PENDING) it will
      93                 :  * restart the apply worker process. This happens in
      94                 :  * process_syncing_tables_for_apply.
      95                 :  *
      96                 :  * When the (re-started) apply worker finds that all tablesyncs are READY for a
      97                 :  * two_phase tri-state of PENDING it start streaming messages with the
      98                 :  * two_phase option which in turn enables the decoding of two-phase commits at
      99                 :  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
     100                 :  * Now, it is possible that during the time we have not enabled two_phase, the
     101                 :  * publisher (replication server) would have skipped some prepares but we
     102                 :  * ensure that such prepares are sent along with commit prepare, see
     103                 :  * ReorderBufferFinishPrepared.
     104                 :  *
     105                 :  * If the subscription has no tables then a two_phase tri-state PENDING is
     106                 :  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
     107                 :  * PUBLICATION which might otherwise be disallowed (see below).
     108                 :  *
     109                 :  * If ever a user needs to be aware of the tri-state value, they can fetch it
     110                 :  * from the pg_subscription catalog (see column subtwophasestate).
     111                 :  *
     112                 :  * We don't allow to toggle two_phase option of a subscription because it can
     113                 :  * lead to an inconsistent replica. Consider, initially, it was on and we have
     114                 :  * received some prepare then we turn it off, now at commit time the server
     115                 :  * will send the entire transaction data along with the commit. With some more
     116                 :  * analysis, we can allow changing this option from off to on but not sure if
     117                 :  * that alone would be useful.
     118                 :  *
     119                 :  * Finally, to avoid problems mentioned in previous paragraphs from any
     120                 :  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
     121                 :  * to 'off' and then again back to 'on') there is a restriction for
     122                 :  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
     123                 :  * the two_phase tri-state is ENABLED, except when copy_data = false.
     124                 :  *
     125                 :  * We can get prepare of the same GID more than once for the genuine cases
     126                 :  * where we have defined multiple subscriptions for publications on the same
     127                 :  * server and prepared transaction has operations on tables subscribed to those
     128                 :  * subscriptions. For such cases, if we use the GID sent by publisher one of
     129                 :  * the prepares will be successful and others will fail, in which case the
     130                 :  * server will send them again. Now, this can lead to a deadlock if user has
     131                 :  * set synchronous_standby_names for all the subscriptions on subscriber. To
     132                 :  * avoid such deadlocks, we generate a unique GID (consisting of the
     133                 :  * subscription oid and the xid of the prepared transaction) for each prepare
     134                 :  * transaction on the subscriber.
     135                 :  *-------------------------------------------------------------------------
     136                 :  */
     137                 : 
     138                 : #include "postgres.h"
     139                 : 
     140                 : #include <sys/stat.h>
     141                 : #include <unistd.h>
     142                 : 
     143                 : #include "access/table.h"
     144                 : #include "access/tableam.h"
     145                 : #include "access/twophase.h"
     146                 : #include "access/xact.h"
     147                 : #include "access/xlog_internal.h"
     148                 : #include "catalog/catalog.h"
     149                 : #include "catalog/indexing.h"
     150                 : #include "catalog/namespace.h"
     151                 : #include "catalog/partition.h"
     152                 : #include "catalog/pg_inherits.h"
     153                 : #include "catalog/pg_subscription.h"
     154                 : #include "catalog/pg_subscription_rel.h"
     155                 : #include "catalog/pg_tablespace.h"
     156                 : #include "commands/tablecmds.h"
     157                 : #include "commands/tablespace.h"
     158                 : #include "commands/trigger.h"
     159                 : #include "executor/executor.h"
     160                 : #include "executor/execPartition.h"
     161                 : #include "executor/nodeModifyTable.h"
     162                 : #include "funcapi.h"
     163                 : #include "libpq/pqformat.h"
     164                 : #include "libpq/pqsignal.h"
     165                 : #include "mb/pg_wchar.h"
     166                 : #include "miscadmin.h"
     167                 : #include "nodes/makefuncs.h"
     168                 : #include "optimizer/optimizer.h"
     169                 : #include "parser/parse_relation.h"
     170                 : #include "pgstat.h"
     171                 : #include "postmaster/bgworker.h"
     172                 : #include "postmaster/interrupt.h"
     173                 : #include "postmaster/postmaster.h"
     174                 : #include "postmaster/walwriter.h"
     175                 : #include "replication/decode.h"
     176                 : #include "replication/logical.h"
     177                 : #include "replication/logicallauncher.h"
     178                 : #include "replication/logicalproto.h"
     179                 : #include "replication/logicalrelation.h"
     180                 : #include "replication/logicalworker.h"
     181                 : #include "replication/origin.h"
     182                 : #include "replication/reorderbuffer.h"
     183                 : #include "replication/snapbuild.h"
     184                 : #include "replication/walreceiver.h"
     185                 : #include "replication/worker_internal.h"
     186                 : #include "rewrite/rewriteHandler.h"
     187                 : #include "storage/buffile.h"
     188                 : #include "storage/bufmgr.h"
     189                 : #include "storage/fd.h"
     190                 : #include "storage/ipc.h"
     191                 : #include "storage/lmgr.h"
     192                 : #include "storage/proc.h"
     193                 : #include "storage/procarray.h"
     194                 : #include "tcop/tcopprot.h"
     195                 : #include "utils/acl.h"
     196                 : #include "utils/builtins.h"
     197                 : #include "utils/catcache.h"
     198                 : #include "utils/dynahash.h"
     199                 : #include "utils/datum.h"
     200                 : #include "utils/fmgroids.h"
     201                 : #include "utils/guc.h"
     202                 : #include "utils/inval.h"
     203                 : #include "utils/lsyscache.h"
     204                 : #include "utils/memutils.h"
     205                 : #include "utils/pg_lsn.h"
     206                 : #include "utils/rel.h"
     207                 : #include "utils/rls.h"
     208                 : #include "utils/syscache.h"
     209                 : #include "utils/timeout.h"
     210                 : #include "utils/usercontext.h"
     211                 : 
     212                 : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
     213                 : 
     214                 : typedef struct FlushPosition
     215                 : {
     216                 :     dlist_node  node;
     217                 :     XLogRecPtr  local_end;
     218                 :     XLogRecPtr  remote_end;
     219                 : } FlushPosition;
     220                 : 
     221                 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
     222                 : 
     223                 : typedef struct ApplyExecutionData
     224                 : {
     225                 :     EState     *estate;         /* executor state, used to track resources */
     226                 : 
     227                 :     LogicalRepRelMapEntry *targetRel;   /* replication target rel */
     228                 :     ResultRelInfo *targetRelInfo;   /* ResultRelInfo for same */
     229                 : 
     230                 :     /* These fields are used when the target relation is partitioned: */
     231                 :     ModifyTableState *mtstate;  /* dummy ModifyTable state */
     232                 :     PartitionTupleRouting *proute;  /* partition routing info */
     233                 : } ApplyExecutionData;
     234                 : 
     235                 : /* Struct for saving and restoring apply errcontext information */
     236                 : typedef struct ApplyErrorCallbackArg
     237                 : {
     238                 :     LogicalRepMsgType command;  /* 0 if invalid */
     239                 :     LogicalRepRelMapEntry *rel;
     240                 : 
     241                 :     /* Remote node information */
     242                 :     int         remote_attnum;  /* -1 if invalid */
     243                 :     TransactionId remote_xid;
     244                 :     XLogRecPtr  finish_lsn;
     245                 :     char       *origin_name;
     246                 : } ApplyErrorCallbackArg;
     247                 : 
     248                 : /*
     249                 :  * The action to be taken for the changes in the transaction.
     250                 :  *
     251                 :  * TRANS_LEADER_APPLY:
     252                 :  * This action means that we are in the leader apply worker or table sync
     253                 :  * worker. The changes of the transaction are either directly applied or
     254                 :  * are read from temporary files (for streaming transactions) and then
     255                 :  * applied by the worker.
     256                 :  *
     257                 :  * TRANS_LEADER_SERIALIZE:
     258                 :  * This action means that we are in the leader apply worker or table sync
     259                 :  * worker. Changes are written to temporary files and then applied when the
     260                 :  * final commit arrives.
     261                 :  *
     262                 :  * TRANS_LEADER_SEND_TO_PARALLEL:
     263                 :  * This action means that we are in the leader apply worker and need to send
     264                 :  * the changes to the parallel apply worker.
     265                 :  *
     266                 :  * TRANS_LEADER_PARTIAL_SERIALIZE:
     267                 :  * This action means that we are in the leader apply worker and have sent some
     268                 :  * changes directly to the parallel apply worker and the remaining changes are
     269                 :  * serialized to a file, due to timeout while sending data. The parallel apply
     270                 :  * worker will apply these serialized changes when the final commit arrives.
     271                 :  *
     272                 :  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
     273                 :  * serializing changes, the leader worker also needs to serialize the
     274                 :  * STREAM_XXX message to a file, and wait for the parallel apply worker to
     275                 :  * finish the transaction when processing the transaction finish command. So
     276                 :  * this new action was introduced to keep the code and logic clear.
     277                 :  *
     278                 :  * TRANS_PARALLEL_APPLY:
     279                 :  * This action means that we are in the parallel apply worker and changes of
     280                 :  * the transaction are applied directly by the worker.
     281                 :  */
     282                 : typedef enum
     283                 : {
     284                 :     /* The action for non-streaming transactions. */
     285                 :     TRANS_LEADER_APPLY,
     286                 : 
     287                 :     /* Actions for streaming transactions. */
     288                 :     TRANS_LEADER_SERIALIZE,
     289                 :     TRANS_LEADER_SEND_TO_PARALLEL,
     290                 :     TRANS_LEADER_PARTIAL_SERIALIZE,
     291                 :     TRANS_PARALLEL_APPLY
     292                 : } TransApplyAction;
     293                 : 
     294                 : /* errcontext tracker */
     295                 : ApplyErrorCallbackArg apply_error_callback_arg =
     296                 : {
     297                 :     .command = 0,
     298                 :     .rel = NULL,
     299                 :     .remote_attnum = -1,
     300                 :     .remote_xid = InvalidTransactionId,
     301                 :     .finish_lsn = InvalidXLogRecPtr,
     302                 :     .origin_name = NULL,
     303                 : };
     304                 : 
     305                 : ErrorContextCallback *apply_error_context_stack = NULL;
     306                 : 
     307                 : MemoryContext ApplyMessageContext = NULL;
     308                 : MemoryContext ApplyContext = NULL;
     309                 : 
     310                 : /* per stream context for streaming transactions */
     311                 : static MemoryContext LogicalStreamingContext = NULL;
     312                 : 
     313                 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
     314                 : 
     315                 : Subscription *MySubscription = NULL;
     316                 : static bool MySubscriptionValid = false;
     317                 : 
     318                 : static List *on_commit_wakeup_workers_subids = NIL;
     319                 : 
     320                 : bool        in_remote_transaction = false;
     321                 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     322                 : 
     323                 : /* fields valid only when processing streamed transaction */
     324                 : static bool in_streamed_transaction = false;
     325                 : 
     326                 : static TransactionId stream_xid = InvalidTransactionId;
     327                 : 
     328                 : /*
     329                 :  * The number of changes applied by parallel apply worker during one streaming
     330                 :  * block.
     331                 :  */
     332                 : static uint32 parallel_stream_nchanges = 0;
     333                 : 
     334                 : /*
     335                 :  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
     336                 :  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
     337                 :  * Once we start skipping changes, we don't stop it until we skip all changes of
     338                 :  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
     339                 :  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
     340                 :  * we don't skip receiving and spooling the changes since we decide whether or not
     341                 :  * to skip applying the changes when starting to apply changes. The subskiplsn is
     342                 :  * cleared after successfully skipping the transaction or applying non-empty
     343                 :  * transaction. The latter prevents the mistakenly specified subskiplsn from
     344                 :  * being left. Note that we cannot skip the streaming transactions when using
     345                 :  * parallel apply workers because we cannot get the finish LSN before applying
     346                 :  * the changes. So, we don't start parallel apply worker when finish LSN is set
     347                 :  * by the user.
     348                 :  */
     349                 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
     350                 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
     351                 : 
     352                 : /* BufFile handle of the current streaming file */
     353                 : static BufFile *stream_fd = NULL;
     354                 : 
     355                 : typedef struct SubXactInfo
     356                 : {
     357                 :     TransactionId xid;          /* XID of the subxact */
     358                 :     int         fileno;         /* file number in the buffile */
     359                 :     off_t       offset;         /* offset in the file */
     360                 : } SubXactInfo;
     361                 : 
     362                 : /* Sub-transaction data for the current streaming transaction */
     363                 : typedef struct ApplySubXactData
     364                 : {
     365                 :     uint32      nsubxacts;      /* number of sub-transactions */
     366                 :     uint32      nsubxacts_max;  /* current capacity of subxacts */
     367                 :     TransactionId subxact_last; /* xid of the last sub-transaction */
     368                 :     SubXactInfo *subxacts;      /* sub-xact offset in changes file */
     369                 : } ApplySubXactData;
     370                 : 
     371                 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
     372                 : 
     373                 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
     374                 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
     375                 : 
     376                 : /*
     377                 :  * Information about subtransactions of a given toplevel transaction.
     378                 :  */
     379                 : static void subxact_info_write(Oid subid, TransactionId xid);
     380                 : static void subxact_info_read(Oid subid, TransactionId xid);
     381                 : static void subxact_info_add(TransactionId xid);
     382                 : static inline void cleanup_subxact_info(void);
     383                 : 
     384                 : /*
     385                 :  * Serialize and deserialize changes for a toplevel transaction.
     386                 :  */
     387                 : static void stream_open_file(Oid subid, TransactionId xid,
     388                 :                              bool first_segment);
     389                 : static void stream_write_change(char action, StringInfo s);
     390                 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
     391                 : static void stream_close_file(void);
     392                 : 
     393                 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     394                 : 
     395                 : static void DisableSubscriptionAndExit(void);
     396                 : 
     397                 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
     398                 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
     399                 :                                          ResultRelInfo *relinfo,
     400                 :                                          TupleTableSlot *remoteslot);
     401                 : static void apply_handle_update_internal(ApplyExecutionData *edata,
     402                 :                                          ResultRelInfo *relinfo,
     403                 :                                          TupleTableSlot *remoteslot,
     404                 :                                          LogicalRepTupleData *newtup,
     405                 :                                          Oid localindexoid);
     406                 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
     407                 :                                          ResultRelInfo *relinfo,
     408                 :                                          TupleTableSlot *remoteslot,
     409                 :                                          Oid localindexoid);
     410                 : static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
     411                 :                                     LogicalRepRelation *remoterel,
     412                 :                                     Oid localidxoid,
     413                 :                                     TupleTableSlot *remoteslot,
     414                 :                                     TupleTableSlot **localslot);
     415                 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
     416                 :                                        TupleTableSlot *remoteslot,
     417                 :                                        LogicalRepTupleData *newtup,
     418                 :                                        CmdType operation);
     419                 : 
     420                 : /* Compute GID for two_phase transactions */
     421                 : static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
     422                 : 
     423                 : /* Functions for skipping changes */
     424                 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
     425                 : static void stop_skipping_changes(void);
     426                 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
     427                 : 
     428                 : /* Functions for apply error callback */
     429                 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
     430                 : static inline void reset_apply_error_context_info(void);
     431                 : 
     432                 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
     433                 :                                                      ParallelApplyWorkerInfo **winfo);
     434                 : 
     435                 : /*
     436                 :  * Return the name of the logical replication worker.
     437                 :  */
     438                 : static const char *
     439 GNC         187 : get_worker_name(void)
     440                 : {
     441             187 :     if (am_tablesync_worker())
     442 UNC           0 :         return _("logical replication table synchronization worker");
     443 GNC         187 :     else if (am_parallel_apply_worker())
     444              10 :         return _("logical replication parallel apply worker");
     445                 :     else
     446             177 :         return _("logical replication apply worker");
     447                 : }
     448                 : 
     449                 : /*
     450                 :  * Form the origin name for the subscription.
     451                 :  *
     452                 :  * This is a common function for tablesync and other workers. Tablesync workers
     453                 :  * must pass a valid relid. Other callers must pass relid = InvalidOid.
     454                 :  *
     455                 :  * Return the name in the supplied buffer.
     456                 :  */
     457                 : void
     458             940 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     459                 :                                    char *originname, Size szoriginname)
     460                 : {
     461             940 :     if (OidIsValid(relid))
     462                 :     {
     463                 :         /* Replication origin name for tablesync workers. */
     464             598 :         snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
     465                 :     }
     466                 :     else
     467                 :     {
     468                 :         /* Replication origin name for non-tablesync workers. */
     469             342 :         snprintf(originname, szoriginname, "pg_%u", suboid);
     470                 :     }
     471             940 : }
     472                 : 
     473                 : /*
     474                 :  * Should this worker apply changes for given relation.
     475                 :  *
     476                 :  * This is mainly needed for initial relation data sync as that runs in
     477                 :  * separate worker process running in parallel and we need some way to skip
     478                 :  * changes coming to the leader apply worker during the sync of a table.
     479                 :  *
     480                 :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     481                 :  * it might hold position of end of initial slot consistent point WAL
     482                 :  * record + 1 (ie start of next record) and next record can be COMMIT of
     483                 :  * transaction we are now processing (which is what we set remote_final_lsn
     484                 :  * to in apply_handle_begin).
     485                 :  *
     486                 :  * Note that for streaming transactions that are being applied in the parallel
     487                 :  * apply worker, we disallow applying changes if the target table in the
     488                 :  * subscription is not in the READY state, because we cannot decide whether to
     489                 :  * apply the change as we won't know remote_final_lsn by that time.
     490                 :  *
     491                 :  * We already checked this in pa_can_start() before assigning the
     492                 :  * streaming transaction to the parallel worker, but it also needs to be
     493                 :  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
     494                 :  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
     495                 :  * while applying this transaction.
     496                 :  */
     497                 : static bool
     498 GIC      147877 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     499                 : {
     500          147877 :     if (am_tablesync_worker())
     501 UIC           0 :         return MyLogicalRepWorker->relid == rel->localreloid;
     502 GNC      147877 :     else if (am_parallel_apply_worker())
     503                 :     {
     504                 :         /* We don't synchronize rel's that are in unknown state. */
     505           68365 :         if (rel->state != SUBREL_STATE_READY &&
     506 UNC           0 :             rel->state != SUBREL_STATE_UNKNOWN)
     507               0 :             ereport(ERROR,
     508                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     509                 :                      errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
     510                 :                             MySubscription->name),
     511                 :                      errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
     512                 : 
     513 GNC       68365 :         return rel->state == SUBREL_STATE_READY;
     514                 :     }
     515                 :     else
     516 GIC       79554 :         return (rel->state == SUBREL_STATE_READY ||
     517              42 :                 (rel->state == SUBREL_STATE_SYNCDONE &&
     518               4 :                  rel->statelsn <= remote_final_lsn));
     519                 : }
     520                 : 
     521                 : /*
     522                 :  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
     523                 :  *
     524                 :  * Start a transaction, if this is the first step (else we keep using the
     525                 :  * existing transaction).
     526                 :  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
     527                 :  */
     528                 : static void
     529          148324 : begin_replication_step(void)
     530                 : {
     531          148324 :     SetCurrentStatementStartTimestamp();
     532                 : 
     533          148324 :     if (!IsTransactionState())
     534                 :     {
     535             849 :         StartTransactionCommand();
     536             849 :         maybe_reread_subscription();
     537                 :     }
     538                 : 
     539          148324 :     PushActiveSnapshot(GetTransactionSnapshot());
     540                 : 
     541          148324 :     MemoryContextSwitchTo(ApplyMessageContext);
     542          148324 : }
     543                 : 
     544                 : /*
     545                 :  * Finish up one step of a replication transaction.
     546                 :  * Callers of begin_replication_step() must also call this.
     547                 :  *
     548                 :  * We don't close out the transaction here, but we should increment
     549                 :  * the command counter to make the effects of this step visible.
     550                 :  */
     551                 : static void
     552          148296 : end_replication_step(void)
     553                 : {
     554          148296 :     PopActiveSnapshot();
     555                 : 
     556          148296 :     CommandCounterIncrement();
     557          148296 : }
     558                 : 
     559                 : /*
     560                 :  * Handle streamed transactions for both the leader apply worker and the
     561                 :  * parallel apply workers.
     562                 :  *
     563                 :  * In the streaming case (receiving a block of the streamed transaction), for
     564                 :  * serialize mode, simply redirect it to a file for the proper toplevel
     565                 :  * transaction, and for parallel mode, the leader apply worker will send the
     566                 :  * changes to parallel apply workers and the parallel apply worker will define
     567                 :  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
     568                 :  * messages will be applied by both leader apply worker and parallel apply
     569                 :  * workers).
     570                 :  *
     571                 :  * Returns true for streamed transactions (when the change is either serialized
     572                 :  * to file or sent to parallel apply worker), false otherwise (regular mode or
     573                 :  * needs to be processed by parallel apply worker).
     574                 :  *
     575                 :  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
     576                 :  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
     577                 :  * to a parallel apply worker.
     578                 :  */
     579                 : static bool
     580          324124 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
     581                 : {
     582                 :     TransactionId current_xid;
     583                 :     ParallelApplyWorkerInfo *winfo;
     584                 :     TransApplyAction apply_action;
     585                 :     StringInfoData original_msg;
     586                 : 
     587 GNC      324124 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
     588                 : 
     589 ECB             :     /* not in streaming mode */
     590 GNC      324124 :     if (apply_action == TRANS_LEADER_APPLY)
     591 CBC       79826 :         return false;
     592 ECB             : 
     593 CBC      244298 :     Assert(TransactionIdIsValid(stream_xid));
     594                 : 
     595                 :     /*
     596                 :      * The parallel apply worker needs the xid in this message to decide
     597                 :      * whether to define a savepoint, so save the original message that has
     598                 :      * not moved the cursor after the xid. We will serialize this message to a
     599                 :      * file in PARTIAL_SERIALIZE mode.
     600                 :      */
     601 GNC      244298 :     original_msg = *s;
     602                 : 
     603                 :     /*
     604                 :      * We should have received XID of the subxact as the first part of the
     605                 :      * message, so extract it.
     606                 :      */
     607          244298 :     current_xid = pq_getmsgint(s, 4);
     608                 : 
     609          244298 :     if (!TransactionIdIsValid(current_xid))
     610 UIC           0 :         ereport(ERROR,
     611                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     612                 :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
     613 ECB             : 
     614 GNC      244298 :     switch (apply_action)
     615                 :     {
     616          102513 :         case TRANS_LEADER_SERIALIZE:
     617          102513 :             Assert(stream_fd);
     618 ECB             : 
     619                 :             /* Add the new subxact to the array (unless already there). */
     620 GNC      102513 :             subxact_info_add(current_xid);
     621 ECB             : 
     622                 :             /* Write the change to the current file */
     623 GNC      102513 :             stream_write_change(action, s);
     624          102513 :             return true;
     625                 : 
     626           68386 :         case TRANS_LEADER_SEND_TO_PARALLEL:
     627           68386 :             Assert(winfo);
     628                 : 
     629                 :             /*
     630                 :              * XXX The publisher side doesn't always send relation/type update
     631                 :              * messages after the streaming transaction, so also update the
     632                 :              * relation/type in leader apply worker. See function
     633                 :              * cleanup_rel_sync_cache.
     634                 :              */
     635           68386 :             if (pa_send_data(winfo, s->len, s->data))
     636           68386 :                 return (action != LOGICAL_REP_MSG_RELATION &&
     637                 :                         action != LOGICAL_REP_MSG_TYPE);
     638                 : 
     639                 :             /*
     640                 :              * Switch to serialize mode when we are not able to send the
     641                 :              * change to parallel apply worker.
     642                 :              */
     643 UNC           0 :             pa_switch_to_partial_serialize(winfo, false);
     644                 : 
     645                 :             /* fall through */
     646 GNC        5006 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
     647            5006 :             stream_write_change(action, &original_msg);
     648                 : 
     649                 :             /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
     650            5006 :             return (action != LOGICAL_REP_MSG_RELATION &&
     651                 :                     action != LOGICAL_REP_MSG_TYPE);
     652                 : 
     653           68393 :         case TRANS_PARALLEL_APPLY:
     654           68393 :             parallel_stream_nchanges += 1;
     655                 : 
     656                 :             /* Define a savepoint for a subxact if needed. */
     657           68393 :             pa_start_subtrans(current_xid, stream_xid);
     658           68393 :             return false;
     659                 : 
     660 UNC           0 :         default:
     661               0 :             Assert(false);
     662                 :             return false;       /* silence compiler warning */
     663                 :     }
     664                 : }
     665                 : 
     666                 : /*
     667 ECB             :  * Executor state preparation for evaluation of constraint expressions,
     668                 :  * indexes and triggers for the specified relation.
     669                 :  *
     670                 :  * Note that the caller must open and close any indexes to be updated.
     671                 :  */
     672                 : static ApplyExecutionData *
     673 GIC      147811 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
     674                 : {
     675                 :     ApplyExecutionData *edata;
     676                 :     EState     *estate;
     677                 :     RangeTblEntry *rte;
     678 GNC      147811 :     List       *perminfos = NIL;
     679                 :     ResultRelInfo *resultRelInfo;
     680                 : 
     681 GIC      147811 :     edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
     682          147811 :     edata->targetRel = rel;
     683                 : 
     684          147811 :     edata->estate = estate = CreateExecutorState();
     685                 : 
     686          147811 :     rte = makeNode(RangeTblEntry);
     687          147811 :     rte->rtekind = RTE_RELATION;
     688          147811 :     rte->relid = RelationGetRelid(rel->localrel);
     689          147811 :     rte->relkind = rel->localrel->rd_rel->relkind;
     690          147811 :     rte->rellockmode = AccessShareLock;
     691                 : 
     692 GNC      147811 :     addRTEPermissionInfo(&perminfos, rte);
     693                 : 
     694          147811 :     ExecInitRangeTable(estate, list_make1(rte), perminfos);
     695                 : 
     696 GIC      147811 :     edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
     697                 : 
     698                 :     /*
     699                 :      * Use Relation opened by logicalrep_rel_open() instead of opening it
     700 ECB             :      * again.
     701                 :      */
     702 CBC      147811 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     703 EUB             : 
     704 ECB             :     /*
     705                 :      * We put the ResultRelInfo in the es_opened_result_relations list, even
     706                 :      * though we don't populate the es_result_relations array.  That's a bit
     707                 :      * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
     708 EUB             :      *
     709                 :      * ExecOpenIndices() is not called here either, each execution path doing
     710                 :      * an apply operation being responsible for that.
     711                 :      */
     712 GIC      147811 :     estate->es_opened_result_relations =
     713          147811 :         lappend(estate->es_opened_result_relations, resultRelInfo);
     714                 : 
     715 CBC      147811 :     estate->es_output_cid = GetCurrentCommandId(true);
     716                 : 
     717                 :     /* Prepare to catch AFTER triggers. */
     718          147811 :     AfterTriggerBeginQuery();
     719 ECB             : 
     720                 :     /* other fields of edata remain NULL for now */
     721                 : 
     722 GIC      147811 :     return edata;
     723                 : }
     724                 : 
     725                 : /*
     726                 :  * Finish any operations related to the executor state created by
     727                 :  * create_edata_for_relation().
     728                 :  */
     729                 : static void
     730          147792 : finish_edata(ApplyExecutionData *edata)
     731 ECB             : {
     732 GIC      147792 :     EState     *estate = edata->estate;
     733 ECB             : 
     734                 :     /* Handle any queued AFTER triggers. */
     735 CBC      147792 :     AfterTriggerEndQuery(estate);
     736                 : 
     737 ECB             :     /* Shut down tuple routing, if any was done. */
     738 CBC      147792 :     if (edata->proute)
     739 GIC          73 :         ExecCleanupTupleRouting(edata->mtstate, edata->proute);
     740                 : 
     741 ECB             :     /*
     742                 :      * Cleanup.  It might seem that we should call ExecCloseResultRelations()
     743                 :      * here, but we intentionally don't.  It would close the rel we added to
     744                 :      * es_opened_result_relations above, which is wrong because we took no
     745                 :      * corresponding refcount.  We rely on ExecCleanupTupleRouting() to close
     746                 :      * any other relations opened during execution.
     747                 :      */
     748 GIC      147792 :     ExecResetTupleTable(estate->es_tupleTable, false);
     749          147792 :     FreeExecutorState(estate);
     750          147792 :     pfree(edata);
     751          147792 : }
     752                 : 
     753                 : /*
     754 ECB             :  * Executes default values for columns for which we can't map to remote
     755                 :  * relation columns.
     756                 :  *
     757                 :  * This allows us to support tables which have more columns on the downstream
     758                 :  * than on the upstream.
     759                 :  */
     760                 : static void
     761 GIC       75593 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     762                 :                    TupleTableSlot *slot)
     763                 : {
     764           75593 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     765           75593 :     int         num_phys_attrs = desc->natts;
     766                 :     int         i;
     767                 :     int         attnum,
     768           75593 :                 num_defaults = 0;
     769                 :     int        *defmap;
     770                 :     ExprState **defexprs;
     771                 :     ExprContext *econtext;
     772                 : 
     773           75593 :     econtext = GetPerTupleExprContext(estate);
     774                 : 
     775                 :     /* We got all the data via replication, no need to evaluate anything. */
     776           75593 :     if (num_phys_attrs == rel->remoterel.natts)
     777           35459 :         return;
     778                 : 
     779           40134 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     780           40134 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     781                 : 
     782 CBC       40134 :     Assert(rel->attrmap->maplen == num_phys_attrs);
     783 GIC      210619 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     784                 :     {
     785                 :         Expr       *defexpr;
     786                 : 
     787          170485 :         if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
     788 UIC           0 :             continue;
     789 ECB             : 
     790 GIC      170485 :         if (rel->attrmap->attnums[attnum] >= 0)
     791           92256 :             continue;
     792 ECB             : 
     793 CBC       78229 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     794                 : 
     795           78229 :         if (defexpr != NULL)
     796                 :         {
     797                 :             /* Run the expression through planner */
     798 GIC       70131 :             defexpr = expression_planner(defexpr);
     799                 : 
     800                 :             /* Initialize executable expression in copycontext */
     801           70131 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     802           70131 :             defmap[num_defaults] = attnum;
     803 CBC       70131 :             num_defaults++;
     804                 :         }
     805                 :     }
     806                 : 
     807 GIC      110265 :     for (i = 0; i < num_defaults; i++)
     808           70131 :         slot->tts_values[defmap[i]] =
     809 CBC       70131 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     810                 : }
     811 ECB             : 
     812 EUB             : /*
     813                 :  * Store tuple data into slot.
     814                 :  *
     815                 :  * Incoming data can be either text or binary format.
     816 ECB             :  */
     817                 : static void
     818 CBC      147811 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     819 ECB             :                 LogicalRepTupleData *tupleData)
     820                 : {
     821 GIC      147811 :     int         natts = slot->tts_tupleDescriptor->natts;
     822 ECB             :     int         i;
     823                 : 
     824 GIC      147811 :     ExecClearTuple(slot);
     825 ECB             : 
     826                 :     /* Call the "in" function for each non-dropped, non-null attribute */
     827 GIC      147811 :     Assert(natts == rel->attrmap->maplen);
     828 CBC      656938 :     for (i = 0; i < natts; i++)
     829 ECB             :     {
     830 GIC      509127 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     831          509127 :         int         remoteattnum = rel->attrmap->attnums[i];
     832                 : 
     833          509127 :         if (!att->attisdropped && remoteattnum >= 0)
     834          302237 :         {
     835          302237 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     836                 : 
     837 CBC      302237 :             Assert(remoteattnum < tupleData->ncols);
     838 ECB             : 
     839                 :             /* Set attnum for error callback */
     840 GIC      302237 :             apply_error_callback_arg.remote_attnum = remoteattnum;
     841                 : 
     842          302237 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
     843                 :             {
     844                 :                 Oid         typinput;
     845 EUB             :                 Oid         typioparam;
     846                 : 
     847 GIC      141940 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     848 CBC      283880 :                 slot->tts_values[i] =
     849          141940 :                     OidInputFunctionCall(typinput, colvalue->data,
     850                 :                                          typioparam, att->atttypmod);
     851 GIC      141940 :                 slot->tts_isnull[i] = false;
     852 ECB             :             }
     853 GIC      160297 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
     854                 :             {
     855 ECB             :                 Oid         typreceive;
     856                 :                 Oid         typioparam;
     857                 : 
     858                 :                 /*
     859                 :                  * In some code paths we may be asked to re-parse the same
     860                 :                  * tuple data.  Reset the StringInfo's cursor so that works.
     861                 :                  */
     862 GBC      109980 :                 colvalue->cursor = 0;
     863 EUB             : 
     864 GIC      109980 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
     865          219960 :                 slot->tts_values[i] =
     866          109980 :                     OidReceiveFunctionCall(typreceive, colvalue,
     867                 :                                            typioparam, att->atttypmod);
     868                 : 
     869                 :                 /* Trouble if it didn't eat the whole buffer */
     870          109980 :                 if (colvalue->cursor != colvalue->len)
     871 UIC           0 :                     ereport(ERROR,
     872                 :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
     873                 :                              errmsg("incorrect binary data format in logical replication column %d",
     874                 :                                     remoteattnum + 1)));
     875 CBC      109980 :                 slot->tts_isnull[i] = false;
     876                 :             }
     877                 :             else
     878                 :             {
     879                 :                 /*
     880 ECB             :                  * NULL value from remote.  (We don't expect to see
     881                 :                  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
     882                 :                  * NULL.)
     883                 :                  */
     884 CBC       50317 :                 slot->tts_values[i] = (Datum) 0;
     885 GIC       50317 :                 slot->tts_isnull[i] = true;
     886 ECB             :             }
     887                 : 
     888                 :             /* Reset attnum for error callback */
     889 CBC      302237 :             apply_error_callback_arg.remote_attnum = -1;
     890 ECB             :         }
     891                 :         else
     892                 :         {
     893                 :             /*
     894                 :              * We assign NULL to dropped attributes and missing values
     895                 :              * (missing values should be later filled using
     896                 :              * slot_fill_defaults).
     897                 :              */
     898 CBC      206890 :             slot->tts_values[i] = (Datum) 0;
     899 GIC      206890 :             slot->tts_isnull[i] = true;
     900                 :         }
     901                 :     }
     902                 : 
     903          147811 :     ExecStoreVirtualTuple(slot);
     904 CBC      147811 : }
     905                 : 
     906                 : /*
     907                 :  * Replace updated columns with data from the LogicalRepTupleData struct.
     908                 :  * This is somewhat similar to heap_modify_tuple but also calls the type
     909                 :  * input functions on the user data.
     910                 :  *
     911                 :  * "slot" is filled with a copy of the tuple in "srcslot", replacing
     912                 :  * columns provided in "tupleData" and leaving others as-is.
     913                 :  *
     914 ECB             :  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
     915                 :  * storage for "srcslot".  This is OK for current usage, but someday we may
     916                 :  * need to materialize "slot" at the end to make it independent of "srcslot".
     917                 :  */
     918                 : static void
     919 GIC       31908 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
     920 ECB             :                  LogicalRepRelMapEntry *rel,
     921                 :                  LogicalRepTupleData *tupleData)
     922                 : {
     923 GIC       31908 :     int         natts = slot->tts_tupleDescriptor->natts;
     924 ECB             :     int         i;
     925                 : 
     926                 :     /* We'll fill "slot" with a virtual tuple, so we must start with ... */
     927 GIC       31908 :     ExecClearTuple(slot);
     928                 : 
     929                 :     /*
     930                 :      * Copy all the column data from srcslot, so that we'll have valid values
     931                 :      * for unreplaced columns.
     932 ECB             :      */
     933 GIC       31908 :     Assert(natts == srcslot->tts_tupleDescriptor->natts);
     934 CBC       31908 :     slot_getallattrs(srcslot);
     935 GIC       31908 :     memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
     936           31908 :     memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
     937 ECB             : 
     938                 :     /* Call the "in" function for each replaced attribute */
     939 GIC       31908 :     Assert(natts == rel->attrmap->maplen);
     940 CBC      159221 :     for (i = 0; i < natts; i++)
     941 ECB             :     {
     942 GIC      127313 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     943          127313 :         int         remoteattnum = rel->attrmap->attnums[i];
     944                 : 
     945          127313 :         if (remoteattnum < 0)
     946           58514 :             continue;
     947                 : 
     948           68799 :         Assert(remoteattnum < tupleData->ncols);
     949                 : 
     950 CBC       68799 :         if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
     951 ECB             :         {
     952 CBC       68796 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     953 ECB             : 
     954                 :             /* Set attnum for error callback */
     955 GIC       68796 :             apply_error_callback_arg.remote_attnum = remoteattnum;
     956                 : 
     957           68796 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
     958                 :             {
     959                 :                 Oid         typinput;
     960                 :                 Oid         typioparam;
     961                 : 
     962           25393 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     963 CBC       50786 :                 slot->tts_values[i] =
     964 GIC       25393 :                     OidInputFunctionCall(typinput, colvalue->data,
     965                 :                                          typioparam, att->atttypmod);
     966 CBC       25393 :                 slot->tts_isnull[i] = false;
     967 ECB             :             }
     968 GIC       43403 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
     969                 :             {
     970 ECB             :                 Oid         typreceive;
     971                 :                 Oid         typioparam;
     972                 : 
     973                 :                 /*
     974                 :                  * In some code paths we may be asked to re-parse the same
     975                 :                  * tuple data.  Reset the StringInfo's cursor so that works.
     976                 :                  */
     977 GIC       43356 :                 colvalue->cursor = 0;
     978 ECB             : 
     979 CBC       43356 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
     980 GIC       86712 :                 slot->tts_values[i] =
     981 CBC       43356 :                     OidReceiveFunctionCall(typreceive, colvalue,
     982 ECB             :                                            typioparam, att->atttypmod);
     983                 : 
     984                 :                 /* Trouble if it didn't eat the whole buffer */
     985 CBC       43356 :                 if (colvalue->cursor != colvalue->len)
     986 UIC           0 :                     ereport(ERROR,
     987                 :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
     988                 :                              errmsg("incorrect binary data format in logical replication column %d",
     989 ECB             :                                     remoteattnum + 1)));
     990 GBC       43356 :                 slot->tts_isnull[i] = false;
     991                 :             }
     992 ECB             :             else
     993                 :             {
     994                 :                 /* must be LOGICALREP_COLUMN_NULL */
     995 CBC          47 :                 slot->tts_values[i] = (Datum) 0;
     996 GIC          47 :                 slot->tts_isnull[i] = true;
     997 ECB             :             }
     998                 : 
     999                 :             /* Reset attnum for error callback */
    1000 CBC       68796 :             apply_error_callback_arg.remote_attnum = -1;
    1001                 :         }
    1002                 :     }
    1003 ECB             : 
    1004                 :     /* And finally, declare that "slot" contains a valid virtual tuple */
    1005 CBC       31908 :     ExecStoreVirtualTuple(slot);
    1006 GIC       31908 : }
    1007                 : 
    1008                 : /*
    1009 ECB             :  * Handle BEGIN message.
    1010                 :  */
    1011                 : static void
    1012 GIC         380 : apply_handle_begin(StringInfo s)
    1013                 : {
    1014                 :     LogicalRepBeginData begin_data;
    1015                 : 
    1016                 :     /* There must not be an active streaming transaction. */
    1017 GNC         380 :     Assert(!TransactionIdIsValid(stream_xid));
    1018                 : 
    1019 GIC         380 :     logicalrep_read_begin(s, &begin_data);
    1020             380 :     set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
    1021                 : 
    1022             380 :     remote_final_lsn = begin_data.final_lsn;
    1023 ECB             : 
    1024 GIC         380 :     maybe_start_skipping_changes(begin_data.final_lsn);
    1025                 : 
    1026 CBC         380 :     in_remote_transaction = true;
    1027                 : 
    1028 GIC         380 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1029 CBC         380 : }
    1030                 : 
    1031                 : /*
    1032 ECB             :  * Handle COMMIT message.
    1033                 :  *
    1034                 :  * TODO, support tracking of multiple origins
    1035                 :  */
    1036                 : static void
    1037 GIC         355 : apply_handle_commit(StringInfo s)
    1038 ECB             : {
    1039                 :     LogicalRepCommitData commit_data;
    1040                 : 
    1041 GIC         355 :     logicalrep_read_commit(s, &commit_data);
    1042 ECB             : 
    1043 GIC         355 :     if (commit_data.commit_lsn != remote_final_lsn)
    1044 UIC           0 :         ereport(ERROR,
    1045 ECB             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1046                 :                  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
    1047                 :                                  LSN_FORMAT_ARGS(commit_data.commit_lsn),
    1048                 :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
    1049                 : 
    1050 GIC         355 :     apply_handle_commit_internal(&commit_data);
    1051                 : 
    1052 ECB             :     /* Process any tables that are being synchronized in parallel. */
    1053 CBC         355 :     process_syncing_tables(commit_data.end_lsn);
    1054 ECB             : 
    1055 GIC         355 :     pgstat_report_activity(STATE_IDLE, NULL);
    1056 CBC         355 :     reset_apply_error_context_info();
    1057 GIC         355 : }
    1058 ECB             : 
    1059                 : /*
    1060                 :  * Handle BEGIN PREPARE message.
    1061                 :  */
    1062                 : static void
    1063 GIC          14 : apply_handle_begin_prepare(StringInfo s)
    1064                 : {
    1065                 :     LogicalRepPreparedTxnData begin_data;
    1066                 : 
    1067 ECB             :     /* Tablesync should never receive prepare. */
    1068 GIC          14 :     if (am_tablesync_worker())
    1069 LBC           0 :         ereport(ERROR,
    1070 ECB             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1071                 :                  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
    1072                 : 
    1073                 :     /* There must not be an active streaming transaction. */
    1074 GNC          14 :     Assert(!TransactionIdIsValid(stream_xid));
    1075                 : 
    1076 GIC          14 :     logicalrep_read_begin_prepare(s, &begin_data);
    1077              14 :     set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
    1078 ECB             : 
    1079 GBC          14 :     remote_final_lsn = begin_data.prepare_lsn;
    1080                 : 
    1081 GIC          14 :     maybe_start_skipping_changes(begin_data.prepare_lsn);
    1082                 : 
    1083 CBC          14 :     in_remote_transaction = true;
    1084                 : 
    1085 GIC          14 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1086              14 : }
    1087                 : 
    1088                 : /*
    1089                 :  * Common function to prepare the GID.
    1090                 :  */
    1091                 : static void
    1092 CBC          21 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
    1093 ECB             : {
    1094                 :     char        gid[GIDSIZE];
    1095                 : 
    1096                 :     /*
    1097                 :      * Compute unique GID for two_phase transactions. We don't use GID of
    1098                 :      * prepared transaction sent by server as that can lead to deadlock when
    1099                 :      * we have multiple subscriptions from same node point to publications on
    1100                 :      * the same node. See comments atop worker.c
    1101                 :      */
    1102 GIC          21 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
    1103                 :                            gid, sizeof(gid));
    1104                 : 
    1105                 :     /*
    1106 ECB             :      * BeginTransactionBlock is necessary to balance the EndTransactionBlock
    1107                 :      * called within the PrepareTransactionBlock below.
    1108                 :      */
    1109 GNC          21 :     if (!IsTransactionBlock())
    1110                 :     {
    1111              21 :         BeginTransactionBlock();
    1112              21 :         CommitTransactionCommand(); /* Completes the preceding Begin command. */
    1113                 :     }
    1114 ECB             : 
    1115                 :     /*
    1116                 :      * Update origin state so we can restart streaming from correct position
    1117                 :      * in case of crash.
    1118                 :      */
    1119 GIC          21 :     replorigin_session_origin_lsn = prepare_data->end_lsn;
    1120              21 :     replorigin_session_origin_timestamp = prepare_data->prepare_time;
    1121                 : 
    1122              21 :     PrepareTransactionBlock(gid);
    1123              21 : }
    1124                 : 
    1125                 : /*
    1126                 :  * Handle PREPARE message.
    1127                 :  */
    1128                 : static void
    1129              13 : apply_handle_prepare(StringInfo s)
    1130 ECB             : {
    1131                 :     LogicalRepPreparedTxnData prepare_data;
    1132                 : 
    1133 GIC          13 :     logicalrep_read_prepare(s, &prepare_data);
    1134 ECB             : 
    1135 GIC          13 :     if (prepare_data.prepare_lsn != remote_final_lsn)
    1136 UIC           0 :         ereport(ERROR,
    1137                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1138 ECB             :                  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
    1139                 :                                  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
    1140                 :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
    1141                 : 
    1142                 :     /*
    1143                 :      * Unlike commit, here, we always prepare the transaction even though no
    1144                 :      * change has happened in this transaction or all changes are skipped. It
    1145                 :      * is done this way because at commit prepared time, we won't know whether
    1146                 :      * we have skipped preparing a transaction because of those reasons.
    1147                 :      *
    1148                 :      * XXX, We can optimize such that at commit prepared time, we first check
    1149                 :      * whether we have prepared the transaction or not but that doesn't seem
    1150                 :      * worthwhile because such cases shouldn't be common.
    1151                 :      */
    1152 GIC          13 :     begin_replication_step();
    1153 ECB             : 
    1154 CBC          13 :     apply_handle_prepare_internal(&prepare_data);
    1155                 : 
    1156              13 :     end_replication_step();
    1157              13 :     CommitTransactionCommand();
    1158 GIC          13 :     pgstat_report_stat(false);
    1159 ECB             : 
    1160 GNC          13 :     store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
    1161 ECB             : 
    1162 GIC          13 :     in_remote_transaction = false;
    1163 ECB             : 
    1164                 :     /* Process any tables that are being synchronized in parallel. */
    1165 GIC          13 :     process_syncing_tables(prepare_data.end_lsn);
    1166 ECB             : 
    1167                 :     /*
    1168                 :      * Since we have already prepared the transaction, in a case where the
    1169                 :      * server crashes before clearing the subskiplsn, it will be left but the
    1170                 :      * transaction won't be resent. But that's okay because it's a rare case
    1171                 :      * and the subskiplsn will be cleared when finishing the next transaction.
    1172                 :      */
    1173 CBC          13 :     stop_skipping_changes();
    1174              13 :     clear_subscription_skip_lsn(prepare_data.prepare_lsn);
    1175 ECB             : 
    1176 GIC          13 :     pgstat_report_activity(STATE_IDLE, NULL);
    1177 CBC          13 :     reset_apply_error_context_info();
    1178 GIC          13 : }
    1179 ECB             : 
    1180                 : /*
    1181                 :  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
    1182                 :  *
    1183                 :  * Note that we don't need to wait here if the transaction was prepared in a
    1184                 :  * parallel apply worker. In that case, we have already waited for the prepare
    1185                 :  * to finish in apply_handle_stream_prepare() which will ensure all the
    1186                 :  * operations in that transaction have happened in the subscriber, so no
    1187                 :  * concurrent transaction can cause deadlock or transaction dependency issues.
    1188                 :  */
    1189                 : static void
    1190 GIC          19 : apply_handle_commit_prepared(StringInfo s)
    1191                 : {
    1192                 :     LogicalRepCommitPreparedTxnData prepare_data;
    1193                 :     char        gid[GIDSIZE];
    1194 ECB             : 
    1195 GIC          19 :     logicalrep_read_commit_prepared(s, &prepare_data);
    1196 CBC          19 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
    1197 ECB             : 
    1198                 :     /* Compute GID for two_phase transactions. */
    1199 GIC          19 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
    1200                 :                            gid, sizeof(gid));
    1201                 : 
    1202 ECB             :     /* There is no transaction when COMMIT PREPARED is called */
    1203 GBC          19 :     begin_replication_step();
    1204                 : 
    1205                 :     /*
    1206                 :      * Update origin state so we can restart streaming from correct position
    1207 ECB             :      * in case of crash.
    1208                 :      */
    1209 GIC          19 :     replorigin_session_origin_lsn = prepare_data.end_lsn;
    1210              19 :     replorigin_session_origin_timestamp = prepare_data.commit_time;
    1211                 : 
    1212 CBC          19 :     FinishPreparedTransaction(gid, true);
    1213              19 :     end_replication_step();
    1214 GIC          19 :     CommitTransactionCommand();
    1215              19 :     pgstat_report_stat(false);
    1216                 : 
    1217 GNC          19 :     store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
    1218 GIC          19 :     in_remote_transaction = false;
    1219                 : 
    1220                 :     /* Process any tables that are being synchronized in parallel. */
    1221              19 :     process_syncing_tables(prepare_data.end_lsn);
    1222 ECB             : 
    1223 CBC          19 :     clear_subscription_skip_lsn(prepare_data.end_lsn);
    1224                 : 
    1225 GIC          19 :     pgstat_report_activity(STATE_IDLE, NULL);
    1226              19 :     reset_apply_error_context_info();
    1227              19 : }
    1228                 : 
    1229 ECB             : /*
    1230                 :  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
    1231                 :  *
    1232                 :  * Note that we don't need to wait here if the transaction was prepared in a
    1233                 :  * parallel apply worker. In that case, we have already waited for the prepare
    1234                 :  * to finish in apply_handle_stream_prepare() which will ensure all the
    1235                 :  * operations in that transaction have happened in the subscriber, so no
    1236                 :  * concurrent transaction can cause deadlock or transaction dependency issues.
    1237                 :  */
    1238                 : static void
    1239 GIC           5 : apply_handle_rollback_prepared(StringInfo s)
    1240 ECB             : {
    1241                 :     LogicalRepRollbackPreparedTxnData rollback_data;
    1242                 :     char        gid[GIDSIZE];
    1243                 : 
    1244 GIC           5 :     logicalrep_read_rollback_prepared(s, &rollback_data);
    1245 CBC           5 :     set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
    1246                 : 
    1247 ECB             :     /* Compute GID for two_phase transactions. */
    1248 GIC           5 :     TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
    1249 ECB             :                            gid, sizeof(gid));
    1250                 : 
    1251                 :     /*
    1252                 :      * It is possible that we haven't received prepare because it occurred
    1253                 :      * before walsender reached a consistent point or the two_phase was still
    1254                 :      * not enabled by that time, so in such cases, we need to skip rollback
    1255                 :      * prepared.
    1256                 :      */
    1257 GIC           5 :     if (LookupGXact(gid, rollback_data.prepare_end_lsn,
    1258                 :                     rollback_data.prepare_time))
    1259                 :     {
    1260 ECB             :         /*
    1261                 :          * Update origin state so we can restart streaming from correct
    1262                 :          * position in case of crash.
    1263                 :          */
    1264 CBC           5 :         replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
    1265 GIC           5 :         replorigin_session_origin_timestamp = rollback_data.rollback_time;
    1266 ECB             : 
    1267 EUB             :         /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
    1268 GIC           5 :         begin_replication_step();
    1269               5 :         FinishPreparedTransaction(gid, false);
    1270               5 :         end_replication_step();
    1271               5 :         CommitTransactionCommand();
    1272                 : 
    1273 CBC           5 :         clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
    1274                 :     }
    1275                 : 
    1276               5 :     pgstat_report_stat(false);
    1277                 : 
    1278 GNC           5 :     store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
    1279 CBC           5 :     in_remote_transaction = false;
    1280 ECB             : 
    1281                 :     /* Process any tables that are being synchronized in parallel. */
    1282 GIC           5 :     process_syncing_tables(rollback_data.rollback_end_lsn);
    1283                 : 
    1284               5 :     pgstat_report_activity(STATE_IDLE, NULL);
    1285               5 :     reset_apply_error_context_info();
    1286 CBC           5 : }
    1287                 : 
    1288                 : /*
    1289                 :  * Handle STREAM PREPARE.
    1290                 :  */
    1291                 : static void
    1292 GIC          11 : apply_handle_stream_prepare(StringInfo s)
    1293 ECB             : {
    1294                 :     LogicalRepPreparedTxnData prepare_data;
    1295                 :     ParallelApplyWorkerInfo *winfo;
    1296                 :     TransApplyAction apply_action;
    1297                 : 
    1298                 :     /* Save the message before it is consumed. */
    1299 GNC          11 :     StringInfoData original_msg = *s;
    1300 ECB             : 
    1301 CBC          11 :     if (in_streamed_transaction)
    1302 UIC           0 :         ereport(ERROR,
    1303 ECB             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1304                 :                  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
    1305                 : 
    1306                 :     /* Tablesync should never receive prepare. */
    1307 CBC          11 :     if (am_tablesync_worker())
    1308 UIC           0 :         ereport(ERROR,
    1309 ECB             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1310                 :                  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
    1311                 : 
    1312 GIC          11 :     logicalrep_read_stream_prepare(s, &prepare_data);
    1313              11 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
    1314                 : 
    1315 GNC          11 :     apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
    1316 ECB             : 
    1317 GNC          11 :     switch (apply_action)
    1318                 :     {
    1319               5 :         case TRANS_LEADER_APPLY:
    1320                 : 
    1321                 :             /*
    1322                 :              * The transaction has been serialized to file, so replay all the
    1323                 :              * spooled operations.
    1324                 :              */
    1325               5 :             apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
    1326                 :                                    prepare_data.xid, prepare_data.prepare_lsn);
    1327                 : 
    1328                 :             /* Mark the transaction as prepared. */
    1329               5 :             apply_handle_prepare_internal(&prepare_data);
    1330                 : 
    1331               5 :             CommitTransactionCommand();
    1332                 : 
    1333               5 :             store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
    1334                 : 
    1335               5 :             in_remote_transaction = false;
    1336                 : 
    1337                 :             /* Unlink the files with serialized changes and subxact info. */
    1338               5 :             stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
    1339                 : 
    1340               5 :             elog(DEBUG1, "finished processing the STREAM PREPARE command");
    1341               5 :             break;
    1342                 : 
    1343               2 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1344               2 :             Assert(winfo);
    1345                 : 
    1346               2 :             if (pa_send_data(winfo, s->len, s->data))
    1347                 :             {
    1348                 :                 /* Finish processing the streaming transaction. */
    1349               2 :                 pa_xact_finish(winfo, prepare_data.end_lsn);
    1350               2 :                 break;
    1351                 :             }
    1352                 : 
    1353                 :             /*
    1354                 :              * Switch to serialize mode when we are not able to send the
    1355                 :              * change to parallel apply worker.
    1356                 :              */
    1357 UNC           0 :             pa_switch_to_partial_serialize(winfo, true);
    1358                 : 
    1359                 :             /* fall through */
    1360 GNC           1 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1361               1 :             Assert(winfo);
    1362                 : 
    1363               1 :             stream_open_and_write_change(prepare_data.xid,
    1364                 :                                          LOGICAL_REP_MSG_STREAM_PREPARE,
    1365                 :                                          &original_msg);
    1366                 : 
    1367               1 :             pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    1368                 : 
    1369                 :             /* Finish processing the streaming transaction. */
    1370               1 :             pa_xact_finish(winfo, prepare_data.end_lsn);
    1371               1 :             break;
    1372                 : 
    1373               3 :         case TRANS_PARALLEL_APPLY:
    1374                 : 
    1375                 :             /*
    1376                 :              * If the parallel apply worker is applying spooled messages then
    1377                 :              * close the file before preparing.
    1378                 :              */
    1379               3 :             if (stream_fd)
    1380               1 :                 stream_close_file();
    1381                 : 
    1382               3 :             begin_replication_step();
    1383                 : 
    1384                 :             /* Mark the transaction as prepared. */
    1385               3 :             apply_handle_prepare_internal(&prepare_data);
    1386                 : 
    1387               3 :             end_replication_step();
    1388                 : 
    1389               3 :             CommitTransactionCommand();
    1390                 : 
    1391               3 :             MyParallelShared->last_commit_end = XactLastCommitEnd;
    1392                 : 
    1393               3 :             pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1394               3 :             pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
    1395                 : 
    1396               3 :             pa_reset_subtrans();
    1397                 : 
    1398               3 :             elog(DEBUG1, "finished processing the STREAM PREPARE command");
    1399               3 :             break;
    1400                 : 
    1401 UNC           0 :         default:
    1402               0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1403                 :             break;
    1404                 :     }
    1405                 : 
    1406 GIC          11 :     pgstat_report_stat(false);
    1407 ECB             : 
    1408                 :     /* Process any tables that are being synchronized in parallel. */
    1409 CBC          11 :     process_syncing_tables(prepare_data.end_lsn);
    1410 ECB             : 
    1411                 :     /*
    1412                 :      * Similar to prepare case, the subskiplsn could be left in a case of
    1413                 :      * server crash but it's okay. See the comments in apply_handle_prepare().
    1414                 :      */
    1415 GIC          11 :     stop_skipping_changes();
    1416              11 :     clear_subscription_skip_lsn(prepare_data.prepare_lsn);
    1417 ECB             : 
    1418 CBC          11 :     pgstat_report_activity(STATE_IDLE, NULL);
    1419                 : 
    1420              11 :     reset_apply_error_context_info();
    1421              11 : }
    1422                 : 
    1423                 : /*
    1424                 :  * Handle ORIGIN message.
    1425                 :  *
    1426                 :  * TODO, support tracking of multiple origins
    1427 ECB             :  */
    1428                 : static void
    1429 GIC           7 : apply_handle_origin(StringInfo s)
    1430                 : {
    1431 ECB             :     /*
    1432                 :      * ORIGIN message can only come inside streaming transaction or inside
    1433                 :      * remote transaction and before any actual writes.
    1434 EUB             :      */
    1435 GIC           7 :     if (!in_streamed_transaction &&
    1436              10 :         (!in_remote_transaction ||
    1437               5 :          (IsTransactionState() && !am_tablesync_worker())))
    1438 UIC           0 :         ereport(ERROR,
    1439                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1440                 :                  errmsg_internal("ORIGIN message sent out of order")));
    1441 GIC           7 : }
    1442                 : 
    1443                 : /*
    1444                 :  * Initialize fileset (if not already done).
    1445                 :  *
    1446                 :  * Create a new file when first_segment is true, otherwise open the existing
    1447                 :  * file.
    1448                 :  */
    1449                 : void
    1450 GNC         361 : stream_start_internal(TransactionId xid, bool first_segment)
    1451                 : {
    1452             361 :     begin_replication_step();
    1453                 : 
    1454                 :     /*
    1455                 :      * Initialize the worker's stream_fileset if we haven't yet. This will be
    1456                 :      * used for the entire duration of the worker so create it in a permanent
    1457                 :      * context. We create this on the very first streaming message from any
    1458                 :      * transaction and then use it for this and other streaming transactions.
    1459                 :      * Now, we could create a fileset at the start of the worker as well but
    1460                 :      * then we won't be sure that it will ever be used.
    1461                 :      */
    1462             361 :     if (!MyLogicalRepWorker->stream_fileset)
    1463                 :     {
    1464                 :         MemoryContext oldctx;
    1465                 : 
    1466              14 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    1467                 : 
    1468              14 :         MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
    1469              14 :         FileSetInit(MyLogicalRepWorker->stream_fileset);
    1470                 : 
    1471              14 :         MemoryContextSwitchTo(oldctx);
    1472                 :     }
    1473                 : 
    1474                 :     /* Open the spool file for this transaction. */
    1475             361 :     stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
    1476                 : 
    1477                 :     /* If this is not the first segment, open existing subxact file. */
    1478             361 :     if (!first_segment)
    1479             329 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1480                 : 
    1481             361 :     end_replication_step();
    1482             361 : }
    1483                 : 
    1484                 : /*
    1485                 :  * Handle STREAM START message.
    1486                 :  */
    1487                 : static void
    1488 GIC         835 : apply_handle_stream_start(StringInfo s)
    1489                 : {
    1490                 :     bool        first_segment;
    1491                 :     ParallelApplyWorkerInfo *winfo;
    1492                 :     TransApplyAction apply_action;
    1493                 : 
    1494                 :     /* Save the message before it is consumed. */
    1495 GNC         835 :     StringInfoData original_msg = *s;
    1496 ECB             : 
    1497 GIC         835 :     if (in_streamed_transaction)
    1498 LBC           0 :         ereport(ERROR,
    1499                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1500 ECB             :                  errmsg_internal("duplicate STREAM START message")));
    1501                 : 
    1502                 :     /* There must not be an active streaming transaction. */
    1503 GNC         835 :     Assert(!TransactionIdIsValid(stream_xid));
    1504                 : 
    1505                 :     /* notify handle methods we're processing a remote transaction */
    1506 GIC         835 :     in_streamed_transaction = true;
    1507                 : 
    1508                 :     /* extract XID of the top-level transaction */
    1509             835 :     stream_xid = logicalrep_read_stream_start(s, &first_segment);
    1510                 : 
    1511 CBC         835 :     if (!TransactionIdIsValid(stream_xid))
    1512 LBC           0 :         ereport(ERROR,
    1513                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1514 ECB             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
    1515                 : 
    1516 CBC         835 :     set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
    1517                 : 
    1518                 :     /* Try to allocate a worker for the streaming transaction. */
    1519 GNC         835 :     if (first_segment)
    1520              82 :         pa_allocate_worker(stream_xid);
    1521                 : 
    1522             835 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
    1523                 : 
    1524             835 :     switch (apply_action)
    1525                 :     {
    1526             341 :         case TRANS_LEADER_SERIALIZE:
    1527                 : 
    1528                 :             /*
    1529                 :              * Function stream_start_internal starts a transaction. This
    1530                 :              * transaction will be committed on the stream stop unless it is a
    1531                 :              * tablesync worker in which case it will be committed after
    1532                 :              * processing all the messages. We need this transaction for
    1533                 :              * handling the BufFile, used for serializing the streaming data
    1534                 :              * and subxact info.
    1535                 :              */
    1536             341 :             stream_start_internal(stream_xid, first_segment);
    1537             341 :             break;
    1538                 : 
    1539             241 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1540             241 :             Assert(winfo);
    1541 ECB             : 
    1542                 :             /*
    1543                 :              * Once we start serializing the changes, the parallel apply
    1544                 :              * worker will wait for the leader to release the stream lock
    1545                 :              * until the end of the transaction. So, we don't need to release
    1546                 :              * the lock or increment the stream count in that case.
    1547                 :              */
    1548 GNC         241 :             if (pa_send_data(winfo, s->len, s->data))
    1549                 :             {
    1550                 :                 /*
    1551                 :                  * Unlock the shared object lock so that the parallel apply
    1552                 :                  * worker can continue to receive changes.
    1553                 :                  */
    1554             237 :                 if (!first_segment)
    1555             214 :                     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1556                 : 
    1557                 :                 /*
    1558                 :                  * Increment the number of streaming blocks waiting to be
    1559                 :                  * processed by parallel apply worker.
    1560                 :                  */
    1561             237 :                 pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
    1562                 : 
    1563                 :                 /* Cache the parallel apply worker for this transaction. */
    1564             237 :                 pa_set_stream_apply_worker(winfo);
    1565             237 :                 break;
    1566                 :             }
    1567                 : 
    1568                 :             /*
    1569                 :              * Switch to serialize mode when we are not able to send the
    1570                 :              * change to parallel apply worker.
    1571                 :              */
    1572               4 :             pa_switch_to_partial_serialize(winfo, !first_segment);
    1573                 : 
    1574                 :             /* fall through */
    1575              15 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1576              15 :             Assert(winfo);
    1577                 : 
    1578                 :             /*
    1579                 :              * Open the spool file unless it was already opened when switching
    1580                 :              * to serialize mode. The transaction started in
    1581                 :              * stream_start_internal will be committed on the stream stop.
    1582                 :              */
    1583              15 :             if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
    1584              11 :                 stream_start_internal(stream_xid, first_segment);
    1585                 : 
    1586              15 :             stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
    1587                 : 
    1588                 :             /* Cache the parallel apply worker for this transaction. */
    1589              15 :             pa_set_stream_apply_worker(winfo);
    1590              15 :             break;
    1591                 : 
    1592             242 :         case TRANS_PARALLEL_APPLY:
    1593             242 :             if (first_segment)
    1594                 :             {
    1595                 :                 /* Hold the lock until the end of the transaction. */
    1596              27 :                 pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
    1597              27 :                 pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
    1598                 : 
    1599                 :                 /*
    1600                 :                  * Signal the leader apply worker, as it may be waiting for
    1601                 :                  * us.
    1602                 :                  */
    1603              27 :                 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
    1604                 :             }
    1605                 : 
    1606             242 :             parallel_stream_nchanges = 0;
    1607             242 :             break;
    1608                 : 
    1609 UNC           0 :         default:
    1610               0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1611                 :             break;
    1612                 :     }
    1613 ECB             : 
    1614 GIC         835 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1615 CBC         835 : }
    1616                 : 
    1617 ECB             : /*
    1618                 :  * Update the information about subxacts and close the file.
    1619                 :  *
    1620                 :  * This function should be called when the stream_start_internal function has
    1621                 :  * been called.
    1622                 :  */
    1623                 : void
    1624 GNC         361 : stream_stop_internal(TransactionId xid)
    1625 ECB             : {
    1626                 :     /*
    1627                 :      * Serialize information about subxacts for the toplevel transaction, then
    1628                 :      * close the stream messages spool file.
    1629                 :      */
    1630 GNC         361 :     subxact_info_write(MyLogicalRepWorker->subid, xid);
    1631 GIC         361 :     stream_close_file();
    1632                 : 
    1633                 :     /* We must be in a valid transaction state */
    1634             361 :     Assert(IsTransactionState());
    1635                 : 
    1636                 :     /* Commit the per-stream transaction */
    1637             361 :     CommitTransactionCommand();
    1638                 : 
    1639                 :     /* Reset per-stream context */
    1640 CBC         361 :     MemoryContextReset(LogicalStreamingContext);
    1641 GNC         361 : }
    1642                 : 
    1643                 : /*
    1644                 :  * Handle STREAM STOP message.
    1645                 :  */
    1646                 : static void
    1647             834 : apply_handle_stream_stop(StringInfo s)
    1648                 : {
    1649                 :     ParallelApplyWorkerInfo *winfo;
    1650                 :     TransApplyAction apply_action;
    1651                 : 
    1652             834 :     if (!in_streamed_transaction)
    1653 UNC           0 :         ereport(ERROR,
    1654                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1655                 :                  errmsg_internal("STREAM STOP message without STREAM START")));
    1656                 : 
    1657 GNC         834 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
    1658                 : 
    1659             834 :     switch (apply_action)
    1660                 :     {
    1661             341 :         case TRANS_LEADER_SERIALIZE:
    1662             341 :             stream_stop_internal(stream_xid);
    1663             341 :             break;
    1664                 : 
    1665             237 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1666             237 :             Assert(winfo);
    1667                 : 
    1668                 :             /*
    1669                 :              * Lock before sending the STREAM_STOP message so that the leader
    1670                 :              * can hold the lock first and the parallel apply worker will wait
    1671                 :              * for leader to release the lock. See Locking Considerations atop
    1672                 :              * applyparallelworker.c.
    1673                 :              */
    1674             237 :             pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1675                 : 
    1676             237 :             if (pa_send_data(winfo, s->len, s->data))
    1677                 :             {
    1678             237 :                 pa_set_stream_apply_worker(NULL);
    1679             237 :                 break;
    1680                 :             }
    1681                 : 
    1682                 :             /*
    1683                 :              * Switch to serialize mode when we are not able to send the
    1684                 :              * change to parallel apply worker.
    1685                 :              */
    1686 UNC           0 :             pa_switch_to_partial_serialize(winfo, true);
    1687                 : 
    1688                 :             /* fall through */
    1689 GNC          15 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1690              15 :             stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
    1691              15 :             stream_stop_internal(stream_xid);
    1692              15 :             pa_set_stream_apply_worker(NULL);
    1693              15 :             break;
    1694                 : 
    1695             241 :         case TRANS_PARALLEL_APPLY:
    1696             241 :             elog(DEBUG1, "applied %u changes in the streaming chunk",
    1697                 :                  parallel_stream_nchanges);
    1698                 : 
    1699                 :             /*
    1700                 :              * By the time parallel apply worker is processing the changes in
    1701                 :              * the current streaming block, the leader apply worker may have
    1702                 :              * sent multiple streaming blocks. This can lead to parallel apply
    1703                 :              * worker start waiting even when there are more chunk of streams
    1704                 :              * in the queue. So, try to lock only if there is no message left
    1705                 :              * in the queue. See Locking Considerations atop
    1706                 :              * applyparallelworker.c.
    1707                 :              *
    1708                 :              * Note that here we have a race condition where we can start
    1709                 :              * waiting even when there are pending streaming chunks. This can
    1710                 :              * happen if the leader sends another streaming block and acquires
    1711                 :              * the stream lock again after the parallel apply worker checks
    1712                 :              * that there is no pending streaming block and before it actually
    1713                 :              * starts waiting on a lock. We can handle this case by not
    1714                 :              * allowing the leader to increment the stream block count during
    1715                 :              * the time parallel apply worker acquires the lock but it is not
    1716                 :              * clear whether that is worth the complexity.
    1717                 :              *
    1718                 :              * Now, if this missed chunk contains rollback to savepoint, then
    1719                 :              * there is a risk of deadlock which probably shouldn't happen
    1720                 :              * after restart.
    1721                 :              */
    1722             241 :             pa_decr_and_wait_stream_block();
    1723             239 :             break;
    1724                 : 
    1725 UNC           0 :         default:
    1726               0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1727                 :             break;
    1728                 :     }
    1729                 : 
    1730 GNC         832 :     in_streamed_transaction = false;
    1731             832 :     stream_xid = InvalidTransactionId;
    1732                 : 
    1733                 :     /*
    1734                 :      * The parallel apply worker could be in a transaction in which case we
    1735                 :      * need to report the state as STATE_IDLEINTRANSACTION.
    1736                 :      */
    1737             832 :     if (IsTransactionOrTransactionBlock())
    1738             239 :         pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
    1739                 :     else
    1740             593 :         pgstat_report_activity(STATE_IDLE, NULL);
    1741                 : 
    1742 GIC         832 :     reset_apply_error_context_info();
    1743             832 : }
    1744 ECB             : 
    1745                 : /*
    1746                 :  * Helper function to handle STREAM ABORT message when the transaction was
    1747                 :  * serialized to file.
    1748                 :  */
    1749                 : static void
    1750 GNC          14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
    1751                 : {
    1752                 :     /*
    1753                 :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1754                 :      * just delete the files with serialized info.
    1755 ECB             :      */
    1756 CBC          14 :     if (xid == subxid)
    1757               1 :         stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    1758 ECB             :     else
    1759                 :     {
    1760                 :         /*
    1761                 :          * OK, so it's a subxact. We need to read the subxact file for the
    1762                 :          * toplevel transaction, determine the offset tracked for the subxact,
    1763                 :          * and truncate the file with changes. We also remove the subxacts
    1764                 :          * with higher offsets (or rather higher XIDs).
    1765                 :          *
    1766                 :          * We intentionally scan the array from the tail, because we're likely
    1767                 :          * aborting a change for the most recent subtransactions.
    1768                 :          *
    1769                 :          * We can't use the binary search here as subxact XIDs won't
    1770                 :          * necessarily arrive in sorted order, consider the case where we have
    1771                 :          * released the savepoint for multiple subtransactions and then
    1772                 :          * performed rollback to savepoint for one of the earlier
    1773                 :          * sub-transaction.
    1774                 :          */
    1775                 :         int64       i;
    1776                 :         int64       subidx;
    1777                 :         BufFile    *fd;
    1778 GIC          13 :         bool        found = false;
    1779                 :         char        path[MAXPGPATH];
    1780 ECB             : 
    1781 GIC          13 :         subidx = -1;
    1782              13 :         begin_replication_step();
    1783              13 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1784                 : 
    1785 CBC          15 :         for (i = subxact_data.nsubxacts; i > 0; i--)
    1786                 :         {
    1787              11 :             if (subxact_data.subxacts[i - 1].xid == subxid)
    1788 EUB             :             {
    1789 GIC           9 :                 subidx = (i - 1);
    1790               9 :                 found = true;
    1791               9 :                 break;
    1792                 :             }
    1793 ECB             :         }
    1794 EUB             : 
    1795                 :         /*
    1796                 :          * If it's an empty sub-transaction then we will not find the subxid
    1797                 :          * here so just cleanup the subxact info and return.
    1798 ECB             :          */
    1799 CBC          13 :         if (!found)
    1800                 :         {
    1801 ECB             :             /* Cleanup the subxact info */
    1802 GIC           4 :             cleanup_subxact_info();
    1803 CBC           4 :             end_replication_step();
    1804 GIC           4 :             CommitTransactionCommand();
    1805               4 :             return;
    1806                 :         }
    1807                 : 
    1808                 :         /* open the changes file */
    1809               9 :         changes_filename(path, MyLogicalRepWorker->subid, xid);
    1810 CBC           9 :         fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
    1811                 :                                 O_RDWR, false);
    1812                 : 
    1813                 :         /* OK, truncate the file at the right offset */
    1814               9 :         BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
    1815 GIC           9 :                                subxact_data.subxacts[subidx].offset);
    1816 CBC           9 :         BufFileClose(fd);
    1817                 : 
    1818 ECB             :         /* discard the subxacts added later */
    1819 GIC           9 :         subxact_data.nsubxacts = subidx;
    1820 ECB             : 
    1821                 :         /* write the updated subxact list */
    1822 GIC           9 :         subxact_info_write(MyLogicalRepWorker->subid, xid);
    1823 ECB             : 
    1824 GIC           9 :         end_replication_step();
    1825 CBC           9 :         CommitTransactionCommand();
    1826 ECB             :     }
    1827                 : }
    1828                 : 
    1829                 : /*
    1830                 :  * Handle STREAM ABORT message.
    1831                 :  */
    1832                 : static void
    1833 GNC          38 : apply_handle_stream_abort(StringInfo s)
    1834                 : {
    1835                 :     TransactionId xid;
    1836                 :     TransactionId subxid;
    1837                 :     LogicalRepStreamAbortData abort_data;
    1838                 :     ParallelApplyWorkerInfo *winfo;
    1839                 :     TransApplyAction apply_action;
    1840                 : 
    1841                 :     /* Save the message before it is consumed. */
    1842              38 :     StringInfoData original_msg = *s;
    1843                 :     bool        toplevel_xact;
    1844                 : 
    1845              38 :     if (in_streamed_transaction)
    1846 UNC           0 :         ereport(ERROR,
    1847                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1848                 :                  errmsg_internal("STREAM ABORT message without STREAM STOP")));
    1849                 : 
    1850                 :     /* We receive abort information only when we can apply in parallel. */
    1851 GNC          38 :     logicalrep_read_stream_abort(s, &abort_data,
    1852              38 :                                  MyLogicalRepWorker->parallel_apply);
    1853                 : 
    1854              38 :     xid = abort_data.xid;
    1855              38 :     subxid = abort_data.subxid;
    1856              38 :     toplevel_xact = (xid == subxid);
    1857                 : 
    1858              38 :     set_apply_error_context_xact(subxid, abort_data.abort_lsn);
    1859                 : 
    1860              38 :     apply_action = get_transaction_apply_action(xid, &winfo);
    1861                 : 
    1862              38 :     switch (apply_action)
    1863                 :     {
    1864              14 :         case TRANS_LEADER_APPLY:
    1865                 : 
    1866                 :             /*
    1867                 :              * We are in the leader apply worker and the transaction has been
    1868                 :              * serialized to file.
    1869                 :              */
    1870              14 :             stream_abort_internal(xid, subxid);
    1871                 : 
    1872              14 :             elog(DEBUG1, "finished processing the STREAM ABORT command");
    1873              14 :             break;
    1874                 : 
    1875              10 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1876              10 :             Assert(winfo);
    1877                 : 
    1878                 :             /*
    1879                 :              * For the case of aborting the subtransaction, we increment the
    1880                 :              * number of streaming blocks and take the lock again before
    1881                 :              * sending the STREAM_ABORT to ensure that the parallel apply
    1882                 :              * worker will wait on the lock for the next set of changes after
    1883                 :              * processing the STREAM_ABORT message if it is not already
    1884                 :              * waiting for STREAM_STOP message.
    1885                 :              *
    1886                 :              * It is important to perform this locking before sending the
    1887                 :              * STREAM_ABORT message so that the leader can hold the lock first
    1888                 :              * and the parallel apply worker will wait for the leader to
    1889                 :              * release the lock. This is the same as what we do in
    1890                 :              * apply_handle_stream_stop. See Locking Considerations atop
    1891                 :              * applyparallelworker.c.
    1892                 :              */
    1893              10 :             if (!toplevel_xact)
    1894                 :             {
    1895               9 :                 pa_unlock_stream(xid, AccessExclusiveLock);
    1896               9 :                 pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
    1897               9 :                 pa_lock_stream(xid, AccessExclusiveLock);
    1898                 :             }
    1899                 : 
    1900              10 :             if (pa_send_data(winfo, s->len, s->data))
    1901                 :             {
    1902                 :                 /*
    1903                 :                  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
    1904                 :                  * wait here for the parallel apply worker to finish as that
    1905                 :                  * is not required to maintain the commit order and won't have
    1906                 :                  * the risk of failures due to transaction dependencies and
    1907                 :                  * deadlocks. However, it is possible that before the parallel
    1908                 :                  * worker finishes and we clear the worker info, the xid
    1909                 :                  * wraparound happens on the upstream and a new transaction
    1910                 :                  * with the same xid can appear and that can lead to duplicate
    1911                 :                  * entries in ParallelApplyTxnHash. Yet another problem could
    1912                 :                  * be that we may have serialized the changes in partial
    1913                 :                  * serialize mode and the file containing xact changes may
    1914                 :                  * already exist, and after xid wraparound trying to create
    1915                 :                  * the file for the same xid can lead to an error. To avoid
    1916                 :                  * these problems, we decide to wait for the aborts to finish.
    1917                 :                  *
    1918                 :                  * Note, it is okay to not update the flush location position
    1919                 :                  * for aborts as in worst case that means such a transaction
    1920                 :                  * won't be sent again after restart.
    1921                 :                  */
    1922              10 :                 if (toplevel_xact)
    1923               1 :                     pa_xact_finish(winfo, InvalidXLogRecPtr);
    1924                 : 
    1925              10 :                 break;
    1926                 :             }
    1927                 : 
    1928                 :             /*
    1929                 :              * Switch to serialize mode when we are not able to send the
    1930                 :              * change to parallel apply worker.
    1931                 :              */
    1932 UNC           0 :             pa_switch_to_partial_serialize(winfo, true);
    1933                 : 
    1934                 :             /* fall through */
    1935 GNC           2 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1936               2 :             Assert(winfo);
    1937                 : 
    1938                 :             /*
    1939                 :              * Parallel apply worker might have applied some changes, so write
    1940                 :              * the STREAM_ABORT message so that it can rollback the
    1941                 :              * subtransaction if needed.
    1942                 :              */
    1943               2 :             stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
    1944                 :                                          &original_msg);
    1945                 : 
    1946               2 :             if (toplevel_xact)
    1947                 :             {
    1948               1 :                 pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    1949               1 :                 pa_xact_finish(winfo, InvalidXLogRecPtr);
    1950                 :             }
    1951               2 :             break;
    1952                 : 
    1953              12 :         case TRANS_PARALLEL_APPLY:
    1954                 : 
    1955                 :             /*
    1956                 :              * If the parallel apply worker is applying spooled messages then
    1957                 :              * close the file before aborting.
    1958                 :              */
    1959              12 :             if (toplevel_xact && stream_fd)
    1960               1 :                 stream_close_file();
    1961                 : 
    1962              12 :             pa_stream_abort(&abort_data);
    1963                 : 
    1964                 :             /*
    1965                 :              * We need to wait after processing rollback to savepoint for the
    1966                 :              * next set of changes.
    1967                 :              *
    1968                 :              * We have a race condition here due to which we can start waiting
    1969                 :              * here when there are more chunk of streams in the queue. See
    1970                 :              * apply_handle_stream_stop.
    1971                 :              */
    1972              12 :             if (!toplevel_xact)
    1973              10 :                 pa_decr_and_wait_stream_block();
    1974                 : 
    1975              12 :             elog(DEBUG1, "finished processing the STREAM ABORT command");
    1976              12 :             break;
    1977                 : 
    1978 UNC           0 :         default:
    1979               0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1980                 :             break;
    1981                 :     }
    1982                 : 
    1983 CBC          38 :     reset_apply_error_context_info();
    1984              38 : }
    1985                 : 
    1986 ECB             : /*
    1987                 :  * Ensure that the passed location is fileset's end.
    1988                 :  */
    1989                 : static void
    1990 GNC           4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
    1991                 :                     off_t offset)
    1992                 : {
    1993                 :     char        path[MAXPGPATH];
    1994                 :     BufFile    *fd;
    1995                 :     int         last_fileno;
    1996                 :     off_t       last_offset;
    1997                 : 
    1998               4 :     Assert(!IsTransactionState());
    1999                 : 
    2000               4 :     begin_replication_step();
    2001                 : 
    2002               4 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    2003                 : 
    2004               4 :     fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
    2005                 : 
    2006               4 :     BufFileSeek(fd, 0, 0, SEEK_END);
    2007               4 :     BufFileTell(fd, &last_fileno, &last_offset);
    2008                 : 
    2009               4 :     BufFileClose(fd);
    2010                 : 
    2011               4 :     end_replication_step();
    2012                 : 
    2013               4 :     if (last_fileno != fileno || last_offset != offset)
    2014 UNC           0 :         elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
    2015                 :              path);
    2016 GNC           4 : }
    2017                 : 
    2018                 : /*
    2019                 :  * Common spoolfile processing.
    2020                 :  */
    2021                 : void
    2022              31 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
    2023                 :                        XLogRecPtr lsn)
    2024                 : {
    2025                 :     StringInfoData s2;
    2026                 :     int         nchanges;
    2027                 :     char        path[MAXPGPATH];
    2028 GIC          31 :     char       *buffer = NULL;
    2029                 :     MemoryContext oldcxt;
    2030                 :     ResourceOwner oldowner;
    2031                 :     int         fileno;
    2032                 :     off_t       offset;
    2033                 : 
    2034 GNC          31 :     if (!am_parallel_apply_worker())
    2035              27 :         maybe_start_skipping_changes(lsn);
    2036 ECB             : 
    2037                 :     /* Make sure we have an open transaction */
    2038 GIC          31 :     begin_replication_step();
    2039 ECB             : 
    2040                 :     /*
    2041                 :      * Allocate file handle and memory required to process all the messages in
    2042                 :      * TopTransactionContext to avoid them getting reset after each message is
    2043                 :      * processed.
    2044                 :      */
    2045 GIC          31 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    2046 ECB             : 
    2047                 :     /* Open the spool file for the committed/prepared transaction */
    2048 GIC          31 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    2049 CBC          31 :     elog(DEBUG1, "replaying changes from file \"%s\"", path);
    2050                 : 
    2051                 :     /*
    2052                 :      * Make sure the file is owned by the toplevel transaction so that the
    2053                 :      * file will not be accidentally closed when aborting a subtransaction.
    2054                 :      */
    2055 GNC          31 :     oldowner = CurrentResourceOwner;
    2056              31 :     CurrentResourceOwner = TopTransactionResourceOwner;
    2057                 : 
    2058              31 :     stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
    2059                 : 
    2060              31 :     CurrentResourceOwner = oldowner;
    2061                 : 
    2062 GIC          31 :     buffer = palloc(BLCKSZ);
    2063 CBC          31 :     initStringInfo(&s2);
    2064 ECB             : 
    2065 GIC          31 :     MemoryContextSwitchTo(oldcxt);
    2066 ECB             : 
    2067 GIC          31 :     remote_final_lsn = lsn;
    2068                 : 
    2069 ECB             :     /*
    2070                 :      * Make sure the handle apply_dispatch methods are aware we're in a remote
    2071                 :      * transaction.
    2072                 :      */
    2073 CBC          31 :     in_remote_transaction = true;
    2074 GIC          31 :     pgstat_report_activity(STATE_RUNNING, NULL);
    2075 ECB             : 
    2076 GIC          31 :     end_replication_step();
    2077 ECB             : 
    2078                 :     /*
    2079                 :      * Read the entries one by one and pass them through the same logic as in
    2080                 :      * apply_dispatch.
    2081                 :      */
    2082 CBC          31 :     nchanges = 0;
    2083 ECB             :     while (true)
    2084 GIC       88470 :     {
    2085 EUB             :         size_t      nbytes;
    2086                 :         int         len;
    2087                 : 
    2088 GIC       88501 :         CHECK_FOR_INTERRUPTS();
    2089                 : 
    2090 ECB             :         /* read length of the on-disk record */
    2091 GNC       88501 :         nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
    2092                 : 
    2093 ECB             :         /* have we reached end of the file? */
    2094 GIC       88501 :         if (nbytes == 0)
    2095              26 :             break;
    2096                 : 
    2097                 :         /* do we have a correct length? */
    2098 CBC       88475 :         if (len <= 0)
    2099 LBC           0 :             elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
    2100                 :                  len, path);
    2101                 : 
    2102                 :         /* make sure we have sufficiently large buffer */
    2103 GIC       88475 :         buffer = repalloc(buffer, len);
    2104                 : 
    2105                 :         /* and finally read the data into the buffer */
    2106 GNC       88475 :         BufFileReadExact(stream_fd, buffer, len);
    2107                 : 
    2108           88475 :         BufFileTell(stream_fd, &fileno, &offset);
    2109                 : 
    2110 ECB             :         /* copy the buffer to the stringinfo and call apply_dispatch */
    2111 CBC       88475 :         resetStringInfo(&s2);
    2112           88475 :         appendBinaryStringInfo(&s2, buffer, len);
    2113 EUB             : 
    2114                 :         /* Ensure we are reading the data into our memory context. */
    2115 GIC       88475 :         oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
    2116 ECB             : 
    2117 GIC       88475 :         apply_dispatch(&s2);
    2118                 : 
    2119           88474 :         MemoryContextReset(ApplyMessageContext);
    2120                 : 
    2121           88474 :         MemoryContextSwitchTo(oldcxt);
    2122                 : 
    2123           88474 :         nchanges++;
    2124                 : 
    2125                 :         /*
    2126                 :          * It is possible the file has been closed because we have processed
    2127                 :          * the transaction end message like stream_commit in which case that
    2128                 :          * must be the last message.
    2129                 :          */
    2130 GNC       88474 :         if (!stream_fd)
    2131                 :         {
    2132               4 :             ensure_last_message(stream_fileset, xid, fileno, offset);
    2133               4 :             break;
    2134                 :         }
    2135                 : 
    2136 CBC       88470 :         if (nchanges % 1000 == 0)
    2137 GIC          84 :             elog(DEBUG1, "replayed %d changes from file \"%s\"",
    2138 ECB             :                  nchanges, path);
    2139                 :     }
    2140                 : 
    2141 GNC          30 :     if (stream_fd)
    2142              26 :         stream_close_file();
    2143                 : 
    2144 GIC          30 :     elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
    2145                 :          nchanges, path);
    2146 ECB             : 
    2147 GIC          30 :     return;
    2148                 : }
    2149                 : 
    2150 ECB             : /*
    2151                 :  * Handle STREAM COMMIT message.
    2152                 :  */
    2153                 : static void
    2154 GIC          61 : apply_handle_stream_commit(StringInfo s)
    2155 ECB             : {
    2156                 :     TransactionId xid;
    2157                 :     LogicalRepCommitData commit_data;
    2158                 :     ParallelApplyWorkerInfo *winfo;
    2159                 :     TransApplyAction apply_action;
    2160                 : 
    2161                 :     /* Save the message before it is consumed. */
    2162 GNC          61 :     StringInfoData original_msg = *s;
    2163                 : 
    2164 CBC          61 :     if (in_streamed_transaction)
    2165 UIC           0 :         ereport(ERROR,
    2166                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    2167 ECB             :                  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
    2168                 : 
    2169 GIC          61 :     xid = logicalrep_read_stream_commit(s, &commit_data);
    2170 CBC          61 :     set_apply_error_context_xact(xid, commit_data.commit_lsn);
    2171 ECB             : 
    2172 GNC          61 :     apply_action = get_transaction_apply_action(xid, &winfo);
    2173                 : 
    2174              61 :     switch (apply_action)
    2175                 :     {
    2176              22 :         case TRANS_LEADER_APPLY:
    2177                 : 
    2178                 :             /*
    2179                 :              * The transaction has been serialized to file, so replay all the
    2180                 :              * spooled operations.
    2181                 :              */
    2182              22 :             apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
    2183                 :                                    commit_data.commit_lsn);
    2184 ECB             : 
    2185 GNC          21 :             apply_handle_commit_internal(&commit_data);
    2186                 : 
    2187                 :             /* Unlink the files with serialized changes and subxact info. */
    2188              21 :             stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    2189                 : 
    2190              21 :             elog(DEBUG1, "finished processing the STREAM COMMIT command");
    2191              21 :             break;
    2192                 : 
    2193              18 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    2194              18 :             Assert(winfo);
    2195                 : 
    2196              18 :             if (pa_send_data(winfo, s->len, s->data))
    2197                 :             {
    2198                 :                 /* Finish processing the streaming transaction. */
    2199              18 :                 pa_xact_finish(winfo, commit_data.end_lsn);
    2200              17 :                 break;
    2201                 :             }
    2202                 : 
    2203                 :             /*
    2204                 :              * Switch to serialize mode when we are not able to send the
    2205                 :              * change to parallel apply worker.
    2206                 :              */
    2207 UNC           0 :             pa_switch_to_partial_serialize(winfo, true);
    2208                 : 
    2209                 :             /* fall through */
    2210 GNC           2 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    2211               2 :             Assert(winfo);
    2212                 : 
    2213               2 :             stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
    2214                 :                                          &original_msg);
    2215                 : 
    2216               2 :             pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    2217                 : 
    2218                 :             /* Finish processing the streaming transaction. */
    2219               2 :             pa_xact_finish(winfo, commit_data.end_lsn);
    2220               2 :             break;
    2221                 : 
    2222              19 :         case TRANS_PARALLEL_APPLY:
    2223                 : 
    2224                 :             /*
    2225                 :              * If the parallel apply worker is applying spooled messages then
    2226                 :              * close the file before committing.
    2227                 :              */
    2228              19 :             if (stream_fd)
    2229               2 :                 stream_close_file();
    2230                 : 
    2231              19 :             apply_handle_commit_internal(&commit_data);
    2232                 : 
    2233              19 :             MyParallelShared->last_commit_end = XactLastCommitEnd;
    2234                 : 
    2235                 :             /*
    2236                 :              * It is important to set the transaction state as finished before
    2237                 :              * releasing the lock. See pa_wait_for_xact_finish.
    2238                 :              */
    2239              19 :             pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    2240              19 :             pa_unlock_transaction(xid, AccessExclusiveLock);
    2241                 : 
    2242              19 :             pa_reset_subtrans();
    2243                 : 
    2244              19 :             elog(DEBUG1, "finished processing the STREAM COMMIT command");
    2245              19 :             break;
    2246                 : 
    2247 UNC           0 :         default:
    2248               0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    2249                 :             break;
    2250                 :     }
    2251                 : 
    2252                 :     /* Process any tables that are being synchronized in parallel. */
    2253 GIC          59 :     process_syncing_tables(commit_data.end_lsn);
    2254                 : 
    2255 CBC          59 :     pgstat_report_activity(STATE_IDLE, NULL);
    2256                 : 
    2257              59 :     reset_apply_error_context_info();
    2258 GBC          59 : }
    2259                 : 
    2260                 : /*
    2261                 :  * Helper function for apply_handle_commit and apply_handle_stream_commit.
    2262                 :  */
    2263 ECB             : static void
    2264 GIC         395 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
    2265                 : {
    2266 CBC         395 :     if (is_skipping_changes())
    2267                 :     {
    2268 GIC           2 :         stop_skipping_changes();
    2269 ECB             : 
    2270                 :         /*
    2271                 :          * Start a new transaction to clear the subskiplsn, if not started
    2272 EUB             :          * yet.
    2273                 :          */
    2274 GIC           2 :         if (!IsTransactionState())
    2275               1 :             StartTransactionCommand();
    2276 ECB             :     }
    2277                 : 
    2278 GIC         395 :     if (IsTransactionState())
    2279 ECB             :     {
    2280                 :         /*
    2281                 :          * The transaction is either non-empty or skipped, so we clear the
    2282                 :          * subskiplsn.
    2283                 :          */
    2284 CBC         395 :         clear_subscription_skip_lsn(commit_data->commit_lsn);
    2285                 : 
    2286 ECB             :         /*
    2287                 :          * Update origin state so we can restart streaming from correct
    2288                 :          * position in case of crash.
    2289                 :          */
    2290 GIC         395 :         replorigin_session_origin_lsn = commit_data->end_lsn;
    2291             395 :         replorigin_session_origin_timestamp = commit_data->committime;
    2292                 : 
    2293             395 :         CommitTransactionCommand();
    2294                 : 
    2295 GNC         395 :         if (IsTransactionBlock())
    2296                 :         {
    2297               4 :             EndTransactionBlock(false);
    2298               4 :             CommitTransactionCommand();
    2299                 :         }
    2300                 : 
    2301 GIC         395 :         pgstat_report_stat(false);
    2302                 : 
    2303 GNC         395 :         store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
    2304 ECB             :     }
    2305                 :     else
    2306                 :     {
    2307                 :         /* Process any invalidation messages that might have accumulated. */
    2308 UIC           0 :         AcceptInvalidationMessages();
    2309               0 :         maybe_reread_subscription();
    2310                 :     }
    2311                 : 
    2312 GIC         395 :     in_remote_transaction = false;
    2313             395 : }
    2314                 : 
    2315 ECB             : /*
    2316                 :  * Handle RELATION message.
    2317                 :  *
    2318                 :  * Note we don't do validation against local schema here. The validation
    2319                 :  * against local schema is postponed until first change for given relation
    2320                 :  * comes as we only care about it when applying changes for it anyway and we
    2321                 :  * do less locking this way.
    2322                 :  */
    2323                 : static void
    2324 GIC         390 : apply_handle_relation(StringInfo s)
    2325                 : {
    2326                 :     LogicalRepRelation *rel;
    2327                 : 
    2328 CBC         390 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
    2329 GIC          36 :         return;
    2330                 : 
    2331 CBC         354 :     rel = logicalrep_read_rel(s);
    2332             354 :     logicalrep_relmap_update(rel);
    2333                 : 
    2334                 :     /* Also reset all entries in the partition map that refer to remoterel. */
    2335 GIC         354 :     logicalrep_partmap_reset_relmap(rel);
    2336                 : }
    2337                 : 
    2338                 : /*
    2339 ECB             :  * Handle TYPE message.
    2340                 :  *
    2341                 :  * This implementation pays no attention to TYPE messages; we expect the user
    2342                 :  * to have set things up so that the incoming data is acceptable to the input
    2343                 :  * functions for the locally subscribed tables.  Hence, we just read and
    2344                 :  * discard the message.
    2345                 :  */
    2346                 : static void
    2347 GIC          18 : apply_handle_type(StringInfo s)
    2348                 : {
    2349                 :     LogicalRepTyp typ;
    2350 ECB             : 
    2351 CBC          18 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
    2352 UIC           0 :         return;
    2353 ECB             : 
    2354 GIC          18 :     logicalrep_read_typ(s, &typ);
    2355                 : }
    2356 ECB             : 
    2357                 : /*
    2358 EUB             :  * Check that we (the subscription owner) have sufficient privileges on the
    2359                 :  * target relation to perform the given operation.
    2360                 :  */
    2361                 : static void
    2362 GIC      220052 : TargetPrivilegesCheck(Relation rel, AclMode mode)
    2363 ECB             : {
    2364                 :     Oid         relid;
    2365                 :     AclResult   aclresult;
    2366                 : 
    2367 GIC      220052 :     relid = RelationGetRelid(rel);
    2368          220052 :     aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
    2369          220052 :     if (aclresult != ACLCHECK_OK)
    2370               7 :         aclcheck_error(aclresult,
    2371               7 :                        get_relkind_objtype(rel->rd_rel->relkind),
    2372               7 :                        get_rel_name(relid));
    2373 ECB             : 
    2374                 :     /*
    2375                 :      * We lack the infrastructure to honor RLS policies.  It might be possible
    2376                 :      * to add such infrastructure here, but tablesync workers lack it, too, so
    2377                 :      * we don't bother.  RLS does not ordinarily apply to TRUNCATE commands,
    2378                 :      * but it seems dangerous to replicate a TRUNCATE and then refuse to
    2379                 :      * replicate subsequent INSERTs, so we forbid all commands the same.
    2380                 :      */
    2381 GIC      220045 :     if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
    2382               3 :         ereport(ERROR,
    2383 ECB             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2384                 :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    2385                 :                         GetUserNameFromId(GetUserId(), true),
    2386                 :                         RelationGetRelationName(rel))));
    2387 GIC      220042 : }
    2388                 : 
    2389 ECB             : /*
    2390                 :  * Handle INSERT message.
    2391                 :  */
    2392                 : 
    2393                 : static void
    2394 GIC      185646 : apply_handle_insert(StringInfo s)
    2395                 : {
    2396 ECB             :     LogicalRepRelMapEntry *rel;
    2397                 :     LogicalRepTupleData newtup;
    2398                 :     LogicalRepRelId relid;
    2399                 :     UserContext     ucxt;
    2400                 :     ApplyExecutionData *edata;
    2401                 :     EState     *estate;
    2402                 :     TupleTableSlot *remoteslot;
    2403 EUB             :     MemoryContext oldctx;
    2404                 :     bool        run_as_owner;
    2405                 : 
    2406                 :     /*
    2407                 :      * Quick return if we are skipping data modification changes or handling
    2408 ECB             :      * streamed transactions.
    2409                 :      */
    2410 CBC      361290 :     if (is_skipping_changes() ||
    2411 GIC      175644 :         handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
    2412 CBC      110046 :         return;
    2413 ECB             : 
    2414 CBC       75638 :     begin_replication_step();
    2415                 : 
    2416           75638 :     relid = logicalrep_read_insert(s, &newtup);
    2417           75638 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2418 GIC       75631 :     if (!should_apply_changes_for_rel(rel))
    2419                 :     {
    2420                 :         /*
    2421                 :          * The relation can't become interesting in the middle of the
    2422                 :          * transaction so it's safe to unlock it.
    2423                 :          */
    2424              38 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2425 CBC          38 :         end_replication_step();
    2426 GIC          38 :         return;
    2427 ECB             :     }
    2428                 : 
    2429                 :     /*
    2430                 :      * Make sure that any user-supplied code runs as the table owner, unless
    2431                 :      * the user has opted out of that behavior.
    2432                 :      */
    2433 GNC       75593 :     run_as_owner = MySubscription->runasowner;
    2434           75593 :     if (!run_as_owner)
    2435           75585 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2436                 : 
    2437 ECB             :     /* Set relation for error callback */
    2438 CBC       75593 :     apply_error_callback_arg.rel = rel;
    2439                 : 
    2440                 :     /* Initialize the executor state. */
    2441 GIC       75593 :     edata = create_edata_for_relation(rel);
    2442           75593 :     estate = edata->estate;
    2443           75593 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2444           75593 :                                         RelationGetDescr(rel->localrel),
    2445 EUB             :                                         &TTSOpsVirtual);
    2446                 : 
    2447                 :     /* Process and store remote tuple in the slot */
    2448 CBC       75593 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2449           75593 :     slot_store_data(remoteslot, rel, &newtup);
    2450           75593 :     slot_fill_defaults(rel, estate, remoteslot);
    2451           75593 :     MemoryContextSwitchTo(oldctx);
    2452 ECB             : 
    2453                 :     /* For a partitioned table, insert the tuple into a partition. */
    2454 CBC       75593 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2455              44 :         apply_handle_tuple_routing(edata,
    2456                 :                                    remoteslot, NULL, CMD_INSERT);
    2457                 :     else
    2458 GIC       75549 :         apply_handle_insert_internal(edata, edata->targetRelInfo,
    2459                 :                                      remoteslot);
    2460                 : 
    2461           75578 :     finish_edata(edata);
    2462                 : 
    2463                 :     /* Reset relation for error callback */
    2464           75578 :     apply_error_callback_arg.rel = NULL;
    2465                 : 
    2466 GNC       75578 :     if (!run_as_owner)
    2467           75573 :         RestoreUserContext(&ucxt);
    2468                 : 
    2469 GIC       75578 :     logicalrep_rel_close(rel, NoLock);
    2470                 : 
    2471           75578 :     end_replication_step();
    2472                 : }
    2473                 : 
    2474                 : /*
    2475                 :  * Workhorse for apply_handle_insert()
    2476                 :  * relinfo is for the relation we're actually inserting into
    2477                 :  * (could be a child partition of edata->targetRelInfo)
    2478                 :  */
    2479                 : static void
    2480           75594 : apply_handle_insert_internal(ApplyExecutionData *edata,
    2481                 :                              ResultRelInfo *relinfo,
    2482                 :                              TupleTableSlot *remoteslot)
    2483                 : {
    2484 CBC       75594 :     EState     *estate = edata->estate;
    2485 ECB             : 
    2486                 :     /* We must open indexes here. */
    2487 GBC       75594 :     ExecOpenIndices(relinfo, false);
    2488 EUB             : 
    2489                 :     /* Do the insert. */
    2490 GIC       75594 :     TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
    2491           75588 :     ExecSimpleRelationInsert(relinfo, estate, remoteslot);
    2492 ECB             : 
    2493                 :     /* Cleanup. */
    2494 GIC       75579 :     ExecCloseIndices(relinfo);
    2495           75579 : }
    2496                 : 
    2497                 : /*
    2498                 :  * Check if the logical replication relation is updatable and throw
    2499 ECB             :  * appropriate error if it isn't.
    2500                 :  */
    2501                 : static void
    2502 CBC       72249 : check_relation_updatable(LogicalRepRelMapEntry *rel)
    2503                 : {
    2504 ECB             :     /*
    2505                 :      * For partitioned tables, we only need to care if the target partition is
    2506                 :      * updatable (aka has PK or RI defined for it).
    2507                 :      */
    2508 GIC       72249 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2509              29 :         return;
    2510                 : 
    2511                 :     /* Updatable, no error. */
    2512 CBC       72220 :     if (rel->updatable)
    2513 GIC       72220 :         return;
    2514                 : 
    2515                 :     /*
    2516                 :      * We are in error mode so it's fine this is somewhat slow. It's better to
    2517                 :      * give user correct error.
    2518 ECB             :      */
    2519 LBC           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
    2520                 :     {
    2521 UIC           0 :         ereport(ERROR,
    2522                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2523                 :                  errmsg("publisher did not send replica identity column "
    2524                 :                         "expected by the logical replication target relation \"%s.%s\"",
    2525                 :                         rel->remoterel.nspname, rel->remoterel.relname)));
    2526                 :     }
    2527                 : 
    2528               0 :     ereport(ERROR,
    2529                 :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2530                 :              errmsg("logical replication target relation \"%s.%s\" has "
    2531                 :                     "neither REPLICA IDENTITY index nor PRIMARY "
    2532                 :                     "KEY and published relation does not have "
    2533                 :                     "REPLICA IDENTITY FULL",
    2534                 :                     rel->remoterel.nspname, rel->remoterel.relname)));
    2535                 : }
    2536                 : 
    2537                 : /*
    2538                 :  * Handle UPDATE message.
    2539                 :  *
    2540 ECB             :  * TODO: FDW support
    2541                 :  */
    2542                 : static void
    2543 CBC       66137 : apply_handle_update(StringInfo s)
    2544 ECB             : {
    2545                 :     LogicalRepRelMapEntry *rel;
    2546                 :     LogicalRepRelId relid;
    2547                 :     UserContext     ucxt;
    2548                 :     ApplyExecutionData *edata;
    2549                 :     EState     *estate;
    2550                 :     LogicalRepTupleData oldtup;
    2551                 :     LogicalRepTupleData newtup;
    2552                 :     bool        has_oldtup;
    2553                 :     TupleTableSlot *remoteslot;
    2554                 :     RTEPermissionInfo *target_perminfo;
    2555                 :     MemoryContext oldctx;
    2556                 :     bool        run_as_owner;
    2557                 : 
    2558                 :     /*
    2559                 :      * Quick return if we are skipping data modification changes or handling
    2560                 :      * streamed transactions.
    2561                 :      */
    2562 GIC      132274 :     if (is_skipping_changes() ||
    2563 CBC       66137 :         handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
    2564 GIC       34220 :         return;
    2565                 : 
    2566 CBC       31917 :     begin_replication_step();
    2567 ECB             : 
    2568 CBC       31917 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
    2569 ECB             :                                    &newtup);
    2570 GIC       31917 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2571           31917 :     if (!should_apply_changes_for_rel(rel))
    2572                 :     {
    2573 ECB             :         /*
    2574                 :          * The relation can't become interesting in the middle of the
    2575                 :          * transaction so it's safe to unlock it.
    2576                 :          */
    2577 UIC           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2578 LBC           0 :         end_replication_step();
    2579               0 :         return;
    2580 ECB             :     }
    2581                 : 
    2582                 :     /* Set relation for error callback */
    2583 CBC       31917 :     apply_error_callback_arg.rel = rel;
    2584                 : 
    2585                 :     /* Check if we can do the update. */
    2586           31917 :     check_relation_updatable(rel);
    2587                 : 
    2588                 :     /*
    2589                 :      * Make sure that any user-supplied code runs as the table owner, unless
    2590                 :      * the user has opted out of that behavior.
    2591                 :      */
    2592 GNC       31917 :     run_as_owner = MySubscription->runasowner;
    2593           31917 :     if (!run_as_owner)
    2594           31914 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2595                 : 
    2596 ECB             :     /* Initialize the executor state. */
    2597 CBC       31915 :     edata = create_edata_for_relation(rel);
    2598 GIC       31915 :     estate = edata->estate;
    2599           31915 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2600           31915 :                                         RelationGetDescr(rel->localrel),
    2601                 :                                         &TTSOpsVirtual);
    2602                 : 
    2603                 :     /*
    2604                 :      * Populate updatedCols so that per-column triggers can fire, and so
    2605 ECB             :      * executor can correctly pass down indexUnchanged hint.  This could
    2606                 :      * include more columns than were actually changed on the publisher
    2607                 :      * because the logical replication protocol doesn't contain that
    2608                 :      * information.  But it would for example exclude columns that only exist
    2609                 :      * on the subscriber, since we are not touching those.
    2610                 :      */
    2611 GNC       31915 :     target_perminfo = list_nth(estate->es_rteperminfos, 0);
    2612 GIC      159237 :     for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
    2613                 :     {
    2614 CBC      127322 :         Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
    2615 GIC      127322 :         int         remoteattnum = rel->attrmap->attnums[i];
    2616                 : 
    2617 CBC      127322 :         if (!att->attisdropped && remoteattnum >= 0)
    2618 EUB             :         {
    2619 GIC       68810 :             Assert(remoteattnum < newtup.ncols);
    2620           68810 :             if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
    2621 GNC       68807 :                 target_perminfo->updatedCols =
    2622           68807 :                     bms_add_member(target_perminfo->updatedCols,
    2623 ECB             :                                    i + 1 - FirstLowInvalidHeapAttributeNumber);
    2624                 :         }
    2625                 :     }
    2626                 : 
    2627                 :     /* Build the search tuple. */
    2628 CBC       31915 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2629 GIC       31915 :     slot_store_data(remoteslot, rel,
    2630 CBC       31915 :                     has_oldtup ? &oldtup : &newtup);
    2631 GIC       31915 :     MemoryContextSwitchTo(oldctx);
    2632 ECB             : 
    2633                 :     /* For a partitioned table, apply update to correct partition. */
    2634 CBC       31915 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2635 GIC          12 :         apply_handle_tuple_routing(edata,
    2636 ECB             :                                    remoteslot, &newtup, CMD_UPDATE);
    2637                 :     else
    2638 GIC       31903 :         apply_handle_update_internal(edata, edata->targetRelInfo,
    2639                 :                                      remoteslot, &newtup, rel->localindexoid);
    2640                 : 
    2641           31911 :     finish_edata(edata);
    2642 ECB             : 
    2643                 :     /* Reset relation for error callback */
    2644 CBC       31911 :     apply_error_callback_arg.rel = NULL;
    2645 ECB             : 
    2646 GNC       31911 :     if (!run_as_owner)
    2647           31909 :         RestoreUserContext(&ucxt);
    2648                 : 
    2649 GIC       31911 :     logicalrep_rel_close(rel, NoLock);
    2650 ECB             : 
    2651 CBC       31911 :     end_replication_step();
    2652                 : }
    2653                 : 
    2654                 : /*
    2655                 :  * Workhorse for apply_handle_update()
    2656                 :  * relinfo is for the relation we're actually updating in
    2657                 :  * (could be a child partition of edata->targetRelInfo)
    2658                 :  */
    2659                 : static void
    2660 GIC       31903 : apply_handle_update_internal(ApplyExecutionData *edata,
    2661                 :                              ResultRelInfo *relinfo,
    2662                 :                              TupleTableSlot *remoteslot,
    2663                 :                              LogicalRepTupleData *newtup,
    2664                 :                              Oid localindexoid)
    2665                 : {
    2666           31903 :     EState     *estate = edata->estate;
    2667           31903 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    2668           31903 :     Relation    localrel = relinfo->ri_RelationDesc;
    2669 ECB             :     EPQState    epqstate;
    2670                 :     TupleTableSlot *localslot;
    2671                 :     bool        found;
    2672                 :     MemoryContext oldctx;
    2673                 : 
    2674 GIC       31903 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    2675           31903 :     ExecOpenIndices(relinfo, false);
    2676 ECB             : 
    2677 GIC       31903 :     found = FindReplTupleInLocalRel(estate, localrel,
    2678                 :                                     &relmapentry->remoterel,
    2679                 :                                     localindexoid,
    2680                 :                                     remoteslot, &localslot);
    2681           31899 :     ExecClearTuple(remoteslot);
    2682                 : 
    2683                 :     /*
    2684                 :      * Tuple found.
    2685                 :      *
    2686                 :      * Note this will fail if there are other conflicting unique indexes.
    2687                 :      */
    2688           31899 :     if (found)
    2689                 :     {
    2690                 :         /* Process and store remote tuple in the slot */
    2691           31898 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2692           31898 :         slot_modify_data(remoteslot, localslot, relmapentry, newtup);
    2693           31898 :         MemoryContextSwitchTo(oldctx);
    2694                 : 
    2695           31898 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
    2696                 : 
    2697                 :         /* Do the actual update. */
    2698           31898 :         TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
    2699 CBC       31898 :         ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
    2700 ECB             :                                  remoteslot);
    2701                 :     }
    2702                 :     else
    2703                 :     {
    2704                 :         /*
    2705                 :          * The tuple to be updated could not be found.  Do nothing except for
    2706                 :          * emitting a log message.
    2707                 :          *
    2708                 :          * XXX should this be promoted to ereport(LOG) perhaps?
    2709 EUB             :          */
    2710 GIC           1 :         elog(DEBUG1,
    2711                 :              "logical replication did not find row to be updated "
    2712 ECB             :              "in replication target relation \"%s\"",
    2713                 :              RelationGetRelationName(localrel));
    2714                 :     }
    2715                 : 
    2716                 :     /* Cleanup. */
    2717 GIC       31899 :     ExecCloseIndices(relinfo);
    2718           31899 :     EvalPlanQualEnd(&epqstate);
    2719           31899 : }
    2720 ECB             : 
    2721                 : /*
    2722                 :  * Handle DELETE message.
    2723                 :  *
    2724                 :  * TODO: FDW support
    2725                 :  */
    2726                 : static void
    2727 GIC       81918 : apply_handle_delete(StringInfo s)
    2728 ECB             : {
    2729                 :     LogicalRepRelMapEntry *rel;
    2730                 :     LogicalRepTupleData oldtup;
    2731                 :     LogicalRepRelId relid;
    2732                 :     UserContext     ucxt;
    2733                 :     ApplyExecutionData *edata;
    2734                 :     EState     *estate;
    2735                 :     TupleTableSlot *remoteslot;
    2736                 :     MemoryContext oldctx;
    2737                 :     bool        run_as_owner;
    2738                 : 
    2739                 :     /*
    2740                 :      * Quick return if we are skipping data modification changes or handling
    2741                 :      * streamed transactions.
    2742                 :      */
    2743 GIC      163836 :     if (is_skipping_changes() ||
    2744           81918 :         handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
    2745           41615 :         return;
    2746                 : 
    2747           40303 :     begin_replication_step();
    2748                 : 
    2749           40303 :     relid = logicalrep_read_delete(s, &oldtup);
    2750           40303 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2751 CBC       40303 :     if (!should_apply_changes_for_rel(rel))
    2752 ECB             :     {
    2753                 :         /*
    2754                 :          * The relation can't become interesting in the middle of the
    2755                 :          * transaction so it's safe to unlock it.
    2756                 :          */
    2757 UBC           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2758               0 :         end_replication_step();
    2759 UIC           0 :         return;
    2760                 :     }
    2761                 : 
    2762 ECB             :     /* Set relation for error callback */
    2763 CBC       40303 :     apply_error_callback_arg.rel = rel;
    2764                 : 
    2765                 :     /* Check if we can do the delete. */
    2766 GIC       40303 :     check_relation_updatable(rel);
    2767                 : 
    2768                 :     /*
    2769                 :      * Make sure that any user-supplied code runs as the table owner, unless
    2770                 :      * the user has opted out of that behavior.
    2771                 :      */
    2772 GNC       40303 :     run_as_owner = MySubscription->runasowner;
    2773           40303 :     if (!run_as_owner)
    2774           40301 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2775                 : 
    2776                 :     /* Initialize the executor state. */
    2777 CBC       40303 :     edata = create_edata_for_relation(rel);
    2778 GIC       40303 :     estate = edata->estate;
    2779           40303 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2780           40303 :                                         RelationGetDescr(rel->localrel),
    2781                 :                                         &TTSOpsVirtual);
    2782                 : 
    2783                 :     /* Build the search tuple. */
    2784           40303 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2785 CBC       40303 :     slot_store_data(remoteslot, rel, &oldtup);
    2786 GIC       40303 :     MemoryContextSwitchTo(oldctx);
    2787 ECB             : 
    2788                 :     /* For a partitioned table, apply delete to correct partition. */
    2789 CBC       40303 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2790 GIC          17 :         apply_handle_tuple_routing(edata,
    2791 ECB             :                                    remoteslot, NULL, CMD_DELETE);
    2792                 :     else
    2793 CBC       40286 :         apply_handle_delete_internal(edata, edata->targetRelInfo,
    2794                 :                                      remoteslot, rel->localindexoid);
    2795                 : 
    2796           40303 :     finish_edata(edata);
    2797                 : 
    2798 ECB             :     /* Reset relation for error callback */
    2799 GIC       40303 :     apply_error_callback_arg.rel = NULL;
    2800 ECB             : 
    2801 GNC       40303 :     if (!run_as_owner)
    2802           40301 :         RestoreUserContext(&ucxt);
    2803                 : 
    2804 GBC       40303 :     logicalrep_rel_close(rel, NoLock);
    2805                 : 
    2806 CBC       40303 :     end_replication_step();
    2807                 : }
    2808                 : 
    2809                 : /*
    2810                 :  * Workhorse for apply_handle_delete()
    2811                 :  * relinfo is for the relation we're actually deleting from
    2812 ECB             :  * (could be a child partition of edata->targetRelInfo)
    2813                 :  */
    2814                 : static void
    2815 GIC       40304 : apply_handle_delete_internal(ApplyExecutionData *edata,
    2816                 :                              ResultRelInfo *relinfo,
    2817                 :                              TupleTableSlot *remoteslot,
    2818                 :                              Oid localindexoid)
    2819 ECB             : {
    2820 GIC       40304 :     EState     *estate = edata->estate;
    2821           40304 :     Relation    localrel = relinfo->ri_RelationDesc;
    2822           40304 :     LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
    2823                 :     EPQState    epqstate;
    2824                 :     TupleTableSlot *localslot;
    2825 ECB             :     bool        found;
    2826                 : 
    2827 GIC       40304 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    2828           40304 :     ExecOpenIndices(relinfo, false);
    2829 ECB             : 
    2830 GNC       40304 :     found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
    2831                 :                                     remoteslot, &localslot);
    2832                 : 
    2833                 :     /* If found delete it. */
    2834 GIC       40304 :     if (found)
    2835                 :     {
    2836 CBC       40299 :         EvalPlanQualSetSlot(&epqstate, localslot);
    2837                 : 
    2838                 :         /* Do the actual delete. */
    2839           40299 :         TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
    2840           40299 :         ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
    2841                 :     }
    2842                 :     else
    2843                 :     {
    2844                 :         /*
    2845                 :          * The tuple to be deleted could not be found.  Do nothing except for
    2846 ECB             :          * emitting a log message.
    2847                 :          *
    2848                 :          * XXX should this be promoted to ereport(LOG) perhaps?
    2849                 :          */
    2850 GIC           5 :         elog(DEBUG1,
    2851 ECB             :              "logical replication did not find row to be deleted "
    2852                 :              "in replication target relation \"%s\"",
    2853                 :              RelationGetRelationName(localrel));
    2854                 :     }
    2855                 : 
    2856                 :     /* Cleanup. */
    2857 GIC       40304 :     ExecCloseIndices(relinfo);
    2858 CBC       40304 :     EvalPlanQualEnd(&epqstate);
    2859 GIC       40304 : }
    2860                 : 
    2861                 : /*
    2862                 :  * Try to find a tuple received from the publication side (in 'remoteslot') in
    2863                 :  * the corresponding local relation using either replica identity index,
    2864                 :  * primary key, index or if needed, sequential scan.
    2865 ECB             :  *
    2866                 :  * Local tuple, if found, is returned in '*localslot'.
    2867                 :  */
    2868                 : static bool
    2869 GIC       72219 : FindReplTupleInLocalRel(EState *estate, Relation localrel,
    2870                 :                         LogicalRepRelation *remoterel,
    2871                 :                         Oid localidxoid,
    2872                 :                         TupleTableSlot *remoteslot,
    2873                 :                         TupleTableSlot **localslot)
    2874 ECB             : {
    2875                 :     bool        found;
    2876                 : 
    2877                 :     /*
    2878                 :      * Regardless of the top-level operation, we're performing a read here, so
    2879                 :      * check for SELECT privileges.
    2880                 :      */
    2881 GIC       72219 :     TargetPrivilegesCheck(localrel, ACL_SELECT);
    2882 ECB             : 
    2883 GIC       72215 :     *localslot = table_slot_create(localrel, &estate->es_tupleTable);
    2884                 : 
    2885 GNC       72215 :     Assert(OidIsValid(localidxoid) ||
    2886                 :            (remoterel->replident == REPLICA_IDENTITY_FULL));
    2887                 : 
    2888           72215 :     if (OidIsValid(localidxoid))
    2889           72071 :         found = RelationFindReplTupleByIndex(localrel, localidxoid,
    2890                 :                                              LockTupleExclusive,
    2891                 :                                              remoteslot, *localslot);
    2892                 :     else
    2893 CBC         144 :         found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
    2894                 :                                          remoteslot, *localslot);
    2895                 : 
    2896           72215 :     return found;
    2897                 : }
    2898 ECB             : 
    2899                 : /*
    2900                 :  * This handles insert, update, delete on a partitioned table.
    2901                 :  */
    2902                 : static void
    2903 GIC          73 : apply_handle_tuple_routing(ApplyExecutionData *edata,
    2904                 :                            TupleTableSlot *remoteslot,
    2905 ECB             :                            LogicalRepTupleData *newtup,
    2906                 :                            CmdType operation)
    2907                 : {
    2908 GIC          73 :     EState     *estate = edata->estate;
    2909 CBC          73 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    2910 GIC          73 :     ResultRelInfo *relinfo = edata->targetRelInfo;
    2911 CBC          73 :     Relation    parentrel = relinfo->ri_RelationDesc;
    2912                 :     ModifyTableState *mtstate;
    2913 ECB             :     PartitionTupleRouting *proute;
    2914                 :     ResultRelInfo *partrelinfo;
    2915                 :     Relation    partrel;
    2916                 :     TupleTableSlot *remoteslot_part;
    2917                 :     TupleConversionMap *map;
    2918                 :     MemoryContext oldctx;
    2919 GIC          73 :     LogicalRepRelMapEntry *part_entry = NULL;
    2920 CBC          73 :     AttrMap    *attrmap = NULL;
    2921                 : 
    2922 ECB             :     /* ModifyTableState is needed for ExecFindPartition(). */
    2923 CBC          73 :     edata->mtstate = mtstate = makeNode(ModifyTableState);
    2924 GIC          73 :     mtstate->ps.plan = NULL;
    2925              73 :     mtstate->ps.state = estate;
    2926 CBC          73 :     mtstate->operation = operation;
    2927              73 :     mtstate->resultRelInfo = relinfo;
    2928                 : 
    2929                 :     /* ... as is PartitionTupleRouting. */
    2930 GIC          73 :     edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
    2931 ECB             : 
    2932                 :     /*
    2933                 :      * Find the partition to which the "search tuple" belongs.
    2934                 :      */
    2935 GIC          73 :     Assert(remoteslot != NULL);
    2936              73 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2937 CBC          73 :     partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
    2938                 :                                     remoteslot, estate);
    2939 GIC          73 :     Assert(partrelinfo != NULL);
    2940              73 :     partrel = partrelinfo->ri_RelationDesc;
    2941                 : 
    2942                 :     /*
    2943                 :      * Check for supported relkind.  We need this since partitions might be of
    2944 ECB             :      * unsupported relkinds; and the set of partitions can change, so checking
    2945                 :      * at CREATE/ALTER SUBSCRIPTION would be insufficient.
    2946                 :      */
    2947 GIC          73 :     CheckSubscriptionRelkind(partrel->rd_rel->relkind,
    2948              73 :                              get_namespace_name(RelationGetNamespace(partrel)),
    2949              73 :                              RelationGetRelationName(partrel));
    2950                 : 
    2951                 :     /*
    2952 ECB             :      * To perform any of the operations below, the tuple must match the
    2953                 :      * partition's rowtype. Convert if needed or just copy, using a dedicated
    2954                 :      * slot to store the tuple in any case.
    2955 EUB             :      */
    2956 GIC          73 :     remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
    2957              73 :     if (remoteslot_part == NULL)
    2958              41 :         remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
    2959 GNC          73 :     map = ExecGetRootToChildMap(partrelinfo, estate);
    2960 CBC          73 :     if (map != NULL)
    2961                 :     {
    2962              32 :         attrmap = map->attrMap;
    2963 GIC          32 :         remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
    2964 ECB             :                                                 remoteslot_part);
    2965                 :     }
    2966                 :     else
    2967                 :     {
    2968 GIC          41 :         remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
    2969              41 :         slot_getallattrs(remoteslot_part);
    2970                 :     }
    2971              73 :     MemoryContextSwitchTo(oldctx);
    2972 ECB             : 
    2973                 :     /* Check if we can do the update or delete on the leaf partition. */
    2974 GIC          73 :     if (operation == CMD_UPDATE || operation == CMD_DELETE)
    2975 ECB             :     {
    2976 GIC          29 :         part_entry = logicalrep_partition_open(relmapentry, partrel,
    2977                 :                                                attrmap);
    2978 CBC          29 :         check_relation_updatable(part_entry);
    2979                 :     }
    2980 ECB             : 
    2981 CBC          73 :     switch (operation)
    2982                 :     {
    2983              44 :         case CMD_INSERT:
    2984              44 :             apply_handle_insert_internal(edata, partrelinfo,
    2985                 :                                          remoteslot_part);
    2986              44 :             break;
    2987                 : 
    2988 GIC          17 :         case CMD_DELETE:
    2989 CBC          17 :             apply_handle_delete_internal(edata, partrelinfo,
    2990                 :                                          remoteslot_part,
    2991                 :                                          part_entry->localindexoid);
    2992 GIC          17 :             break;
    2993                 : 
    2994              12 :         case CMD_UPDATE:
    2995                 : 
    2996                 :             /*
    2997                 :              * For UPDATE, depending on whether or not the updated tuple
    2998 EUB             :              * satisfies the partition's constraint, perform a simple UPDATE
    2999                 :              * of the partition or move the updated tuple into a different
    3000                 :              * suitable partition.
    3001 ECB             :              */
    3002                 :             {
    3003                 :                 TupleTableSlot *localslot;
    3004                 :                 ResultRelInfo *partrelinfo_new;
    3005                 :                 Relation    partrel_new;
    3006                 :                 bool        found;
    3007                 : 
    3008                 :                 /* Get the matching local tuple from the partition. */
    3009 GIC          12 :                 found = FindReplTupleInLocalRel(estate, partrel,
    3010 ECB             :                                                 &part_entry->remoterel,
    3011                 :                                                 part_entry->localindexoid,
    3012                 :                                                 remoteslot_part, &localslot);
    3013 GIC          12 :                 if (!found)
    3014 ECB             :                 {
    3015                 :                     /*
    3016                 :                      * The tuple to be updated could not be found.  Do nothing
    3017                 :                      * except for emitting a log message.
    3018                 :                      *
    3019                 :                      * XXX should this be promoted to ereport(LOG) perhaps?
    3020                 :                      */
    3021 CBC           2 :                     elog(DEBUG1,
    3022                 :                          "logical replication did not find row to be updated "
    3023 ECB             :                          "in replication target relation's partition \"%s\"",
    3024                 :                          RelationGetRelationName(partrel));
    3025 CBC           2 :                     return;
    3026                 :                 }
    3027                 : 
    3028                 :                 /*
    3029                 :                  * Apply the update to the local tuple, putting the result in
    3030                 :                  * remoteslot_part.
    3031 ECB             :                  */
    3032 CBC          10 :                 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3033 GIC          10 :                 slot_modify_data(remoteslot_part, localslot, part_entry,
    3034 ECB             :                                  newtup);
    3035 GIC          10 :                 MemoryContextSwitchTo(oldctx);
    3036 ECB             : 
    3037                 :                 /*
    3038                 :                  * Does the updated tuple still satisfy the current
    3039 EUB             :                  * partition's constraint?
    3040                 :                  */
    3041 GIC          20 :                 if (!partrel->rd_rel->relispartition ||
    3042              10 :                     ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
    3043                 :                                        false))
    3044               9 :                 {
    3045 ECB             :                     /*
    3046                 :                      * Yes, so simply UPDATE the partition.  We don't call
    3047                 :                      * apply_handle_update_internal() here, which would
    3048                 :                      * normally do the following work, to avoid repeating some
    3049                 :                      * work already done above to find the local tuple in the
    3050                 :                      * partition.
    3051                 :                      */
    3052                 :                     EPQState    epqstate;
    3053                 : 
    3054 GIC           9 :                     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    3055               9 :                     ExecOpenIndices(partrelinfo, false);
    3056 ECB             : 
    3057 GIC           9 :                     EvalPlanQualSetSlot(&epqstate, remoteslot_part);
    3058 CBC           9 :                     TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
    3059                 :                                           ACL_UPDATE);
    3060               9 :                     ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
    3061                 :                                              localslot, remoteslot_part);
    3062 GIC           9 :                     ExecCloseIndices(partrelinfo);
    3063               9 :                     EvalPlanQualEnd(&epqstate);
    3064                 :                 }
    3065                 :                 else
    3066 ECB             :                 {
    3067                 :                     /* Move the tuple into the new partition. */
    3068                 : 
    3069                 :                     /*
    3070                 :                      * New partition will be found using tuple routing, which
    3071                 :                      * can only occur via the parent table.  We might need to
    3072                 :                      * convert the tuple to the parent's rowtype.  Note that
    3073                 :                      * this is the tuple found in the partition, not the
    3074                 :                      * original search tuple received by this function.
    3075                 :                      */
    3076 CBC           1 :                     if (map)
    3077                 :                     {
    3078                 :                         TupleConversionMap *PartitionToRootMap =
    3079 GIC           1 :                         convert_tuples_by_name(RelationGetDescr(partrel),
    3080                 :                                                RelationGetDescr(parentrel));
    3081                 : 
    3082 ECB             :                         remoteslot =
    3083 CBC           1 :                             execute_attr_map_slot(PartitionToRootMap->attrMap,
    3084                 :                                                   remoteslot_part, remoteslot);
    3085 ECB             :                     }
    3086                 :                     else
    3087                 :                     {
    3088 UIC           0 :                         remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
    3089 LBC           0 :                         slot_getallattrs(remoteslot);
    3090 ECB             :                     }
    3091                 : 
    3092                 :                     /* Find the new partition. */
    3093 CBC           1 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3094 GIC           1 :                     partrelinfo_new = ExecFindPartition(mtstate, relinfo,
    3095 ECB             :                                                         proute, remoteslot,
    3096                 :                                                         estate);
    3097 GIC           1 :                     MemoryContextSwitchTo(oldctx);
    3098               1 :                     Assert(partrelinfo_new != partrelinfo);
    3099               1 :                     partrel_new = partrelinfo_new->ri_RelationDesc;
    3100 EUB             : 
    3101                 :                     /* Check that new partition also has supported relkind. */
    3102 GIC           1 :                     CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
    3103               1 :                                              get_namespace_name(RelationGetNamespace(partrel_new)),
    3104 CBC           1 :                                              RelationGetRelationName(partrel_new));
    3105 ECB             : 
    3106                 :                     /* DELETE old tuple found in the old partition. */
    3107 GIC           1 :                     apply_handle_delete_internal(edata, partrelinfo,
    3108                 :                                                  localslot,
    3109                 :                                                  part_entry->localindexoid);
    3110                 : 
    3111                 :                     /* INSERT new tuple into the new partition. */
    3112                 : 
    3113                 :                     /*
    3114                 :                      * Convert the replacement tuple to match the destination
    3115                 :                      * partition rowtype.
    3116                 :                      */
    3117 CBC           1 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3118 GIC           1 :                     remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
    3119               1 :                     if (remoteslot_part == NULL)
    3120               1 :                         remoteslot_part = table_slot_create(partrel_new,
    3121 ECB             :                                                             &estate->es_tupleTable);
    3122 GNC           1 :                     map = ExecGetRootToChildMap(partrelinfo_new, estate);
    3123 GIC           1 :                     if (map != NULL)
    3124 ECB             :                     {
    3125 LBC           0 :                         remoteslot_part = execute_attr_map_slot(map->attrMap,
    3126                 :                                                                 remoteslot,
    3127                 :                                                                 remoteslot_part);
    3128 ECB             :                     }
    3129                 :                     else
    3130                 :                     {
    3131 GIC           1 :                         remoteslot_part = ExecCopySlot(remoteslot_part,
    3132                 :                                                        remoteslot);
    3133               1 :                         slot_getallattrs(remoteslot);
    3134                 :                     }
    3135               1 :                     MemoryContextSwitchTo(oldctx);
    3136               1 :                     apply_handle_insert_internal(edata, partrelinfo_new,
    3137                 :                                                  remoteslot_part);
    3138                 :                 }
    3139                 :             }
    3140 CBC          10 :             break;
    3141                 : 
    3142 UIC           0 :         default:
    3143               0 :             elog(ERROR, "unrecognized CmdType: %d", (int) operation);
    3144 ECB             :             break;
    3145 EUB             :     }
    3146                 : }
    3147 ECB             : 
    3148                 : /*
    3149                 :  * Handle TRUNCATE message.
    3150                 :  *
    3151                 :  * TODO: FDW support
    3152                 :  */
    3153                 : static void
    3154 GIC          17 : apply_handle_truncate(StringInfo s)
    3155 ECB             : {
    3156 GIC          17 :     bool        cascade = false;
    3157              17 :     bool        restart_seqs = false;
    3158              17 :     List       *remote_relids = NIL;
    3159              17 :     List       *remote_rels = NIL;
    3160 CBC          17 :     List       *rels = NIL;
    3161              17 :     List       *part_rels = NIL;
    3162              17 :     List       *relids = NIL;
    3163              17 :     List       *relids_logged = NIL;
    3164 ECB             :     ListCell   *lc;
    3165 CBC          17 :     LOCKMODE    lockmode = AccessExclusiveLock;
    3166                 : 
    3167                 :     /*
    3168                 :      * Quick return if we are skipping data modification changes or handling
    3169                 :      * streamed transactions.
    3170                 :      */
    3171 GIC          34 :     if (is_skipping_changes() ||
    3172              17 :         handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
    3173 UIC           0 :         return;
    3174 ECB             : 
    3175 CBC          17 :     begin_replication_step();
    3176                 : 
    3177 GIC          17 :     remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
    3178                 : 
    3179              43 :     foreach(lc, remote_relids)
    3180 ECB             :     {
    3181 GIC          26 :         LogicalRepRelId relid = lfirst_oid(lc);
    3182                 :         LogicalRepRelMapEntry *rel;
    3183                 : 
    3184              26 :         rel = logicalrep_rel_open(relid, lockmode);
    3185              26 :         if (!should_apply_changes_for_rel(rel))
    3186                 :         {
    3187 ECB             :             /*
    3188                 :              * The relation can't become interesting in the middle of the
    3189                 :              * transaction so it's safe to unlock it.
    3190                 :              */
    3191 UIC           0 :             logicalrep_rel_close(rel, lockmode);
    3192               0 :             continue;
    3193                 :         }
    3194                 : 
    3195 GIC          26 :         remote_rels = lappend(remote_rels, rel);
    3196              26 :         TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
    3197              26 :         rels = lappend(rels, rel->localrel);
    3198              26 :         relids = lappend_oid(relids, rel->localreloid);
    3199              26 :         if (RelationIsLogicallyLogged(rel->localrel))
    3200              26 :             relids_logged = lappend_oid(relids_logged, rel->localreloid);
    3201                 : 
    3202                 :         /*
    3203 ECB             :          * Truncate partitions if we got a message to truncate a partitioned
    3204                 :          * table.
    3205                 :          */
    3206 GIC          26 :         if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    3207 ECB             :         {
    3208                 :             ListCell   *child;
    3209 CBC           4 :             List       *children = find_all_inheritors(rel->localreloid,
    3210 ECB             :                                                        lockmode,
    3211                 :                                                        NULL);
    3212                 : 
    3213 GIC          15 :             foreach(child, children)
    3214                 :             {
    3215              11 :                 Oid         childrelid = lfirst_oid(child);
    3216                 :                 Relation    childrel;
    3217 ECB             : 
    3218 CBC          11 :                 if (list_member_oid(relids, childrelid))
    3219               4 :                     continue;
    3220                 : 
    3221                 :                 /* find_all_inheritors already got lock */
    3222 GIC           7 :                 childrel = table_open(childrelid, NoLock);
    3223                 : 
    3224                 :                 /*
    3225                 :                  * Ignore temp tables of other backends.  See similar code in
    3226 ECB             :                  * ExecuteTruncate().
    3227                 :                  */
    3228 CBC           7 :                 if (RELATION_IS_OTHER_TEMP(childrel))
    3229                 :                 {
    3230 UIC           0 :                     table_close(childrel, lockmode);
    3231 LBC           0 :                     continue;
    3232                 :                 }
    3233                 : 
    3234 CBC           7 :                 TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
    3235               7 :                 rels = lappend(rels, childrel);
    3236               7 :                 part_rels = lappend(part_rels, childrel);
    3237               7 :                 relids = lappend_oid(relids, childrelid);
    3238                 :                 /* Log this relation only if needed for logical decoding */
    3239 GIC           7 :                 if (RelationIsLogicallyLogged(childrel))
    3240               7 :                     relids_logged = lappend_oid(relids_logged, childrelid);
    3241 ECB             :             }
    3242                 :         }
    3243                 :     }
    3244                 : 
    3245                 :     /*
    3246                 :      * Even if we used CASCADE on the upstream primary we explicitly default
    3247                 :      * to replaying changes without further cascading. This might be later
    3248                 :      * changeable with a user specified option.
    3249                 :      *
    3250                 :      * MySubscription->runasowner tells us whether we want to execute
    3251                 :      * replication actions as the subscription owner; the last argument to
    3252                 :      * TruncateGuts tells it whether we want to switch to the table owner.
    3253                 :      * Those are exactly opposite conditions.
    3254                 :      */
    3255 GIC          17 :     ExecuteTruncateGuts(rels,
    3256 ECB             :                         relids,
    3257                 :                         relids_logged,
    3258                 :                         DROP_RESTRICT,
    3259                 :                         restart_seqs,
    3260 GNC          17 :                         !MySubscription->runasowner);
    3261 GIC          43 :     foreach(lc, remote_rels)
    3262                 :     {
    3263 CBC          26 :         LogicalRepRelMapEntry *rel = lfirst(lc);
    3264                 : 
    3265              26 :         logicalrep_rel_close(rel, NoLock);
    3266 ECB             :     }
    3267 GIC          24 :     foreach(lc, part_rels)
    3268 ECB             :     {
    3269 GIC           7 :         Relation    rel = lfirst(lc);
    3270 ECB             : 
    3271 GIC           7 :         table_close(rel, NoLock);
    3272                 :     }
    3273                 : 
    3274              17 :     end_replication_step();
    3275                 : }
    3276                 : 
    3277                 : 
    3278                 : /*
    3279 ECB             :  * Logical replication protocol message dispatcher.
    3280                 :  */
    3281                 : void
    3282 GIC      336698 : apply_dispatch(StringInfo s)
    3283 ECB             : {
    3284 GIC      336698 :     LogicalRepMsgType action = pq_getmsgbyte(s);
    3285                 :     LogicalRepMsgType saved_command;
    3286 ECB             : 
    3287                 :     /*
    3288                 :      * Set the current command being applied. Since this function can be
    3289                 :      * called recursively when applying spooled changes, save the current
    3290                 :      * command.
    3291                 :      */
    3292 GIC      336698 :     saved_command = apply_error_callback_arg.command;
    3293 CBC      336698 :     apply_error_callback_arg.command = action;
    3294 ECB             : 
    3295 GIC      336698 :     switch (action)
    3296                 :     {
    3297             380 :         case LOGICAL_REP_MSG_BEGIN:
    3298             380 :             apply_handle_begin(s);
    3299             380 :             break;
    3300                 : 
    3301 CBC         355 :         case LOGICAL_REP_MSG_COMMIT:
    3302 GIC         355 :             apply_handle_commit(s);
    3303             355 :             break;
    3304                 : 
    3305          185646 :         case LOGICAL_REP_MSG_INSERT:
    3306          185646 :             apply_handle_insert(s);
    3307 CBC      185624 :             break;
    3308 ECB             : 
    3309 GIC       66137 :         case LOGICAL_REP_MSG_UPDATE:
    3310           66137 :             apply_handle_update(s);
    3311 CBC       66131 :             break;
    3312 ECB             : 
    3313 GIC       81918 :         case LOGICAL_REP_MSG_DELETE:
    3314           81918 :             apply_handle_delete(s);
    3315           81918 :             break;
    3316                 : 
    3317              17 :         case LOGICAL_REP_MSG_TRUNCATE:
    3318 GBC          17 :             apply_handle_truncate(s);
    3319 GIC          17 :             break;
    3320 EUB             : 
    3321 GIC         390 :         case LOGICAL_REP_MSG_RELATION:
    3322             390 :             apply_handle_relation(s);
    3323             390 :             break;
    3324                 : 
    3325              18 :         case LOGICAL_REP_MSG_TYPE:
    3326              18 :             apply_handle_type(s);
    3327 GBC          18 :             break;
    3328                 : 
    3329 GIC           7 :         case LOGICAL_REP_MSG_ORIGIN:
    3330               7 :             apply_handle_origin(s);
    3331               7 :             break;
    3332                 : 
    3333 UIC           0 :         case LOGICAL_REP_MSG_MESSAGE:
    3334                 : 
    3335                 :             /*
    3336                 :              * Logical replication does not use generic logical messages yet.
    3337                 :              * Although, it could be used by other applications that use this
    3338                 :              * output plugin.
    3339                 :              */
    3340               0 :             break;
    3341                 : 
    3342 CBC         835 :         case LOGICAL_REP_MSG_STREAM_START:
    3343 GIC         835 :             apply_handle_stream_start(s);
    3344             835 :             break;
    3345                 : 
    3346             834 :         case LOGICAL_REP_MSG_STREAM_STOP:
    3347             834 :             apply_handle_stream_stop(s);
    3348             832 :             break;
    3349                 : 
    3350              38 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    3351              38 :             apply_handle_stream_abort(s);
    3352              38 :             break;
    3353                 : 
    3354              61 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    3355              61 :             apply_handle_stream_commit(s);
    3356              59 :             break;
    3357                 : 
    3358              14 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    3359              14 :             apply_handle_begin_prepare(s);
    3360              14 :             break;
    3361 ECB             : 
    3362 CBC          13 :         case LOGICAL_REP_MSG_PREPARE:
    3363              13 :             apply_handle_prepare(s);
    3364 GIC          13 :             break;
    3365 ECB             : 
    3366 GIC          19 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    3367 CBC          19 :             apply_handle_commit_prepared(s);
    3368 GIC          19 :             break;
    3369 ECB             : 
    3370 CBC           5 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    3371 GIC           5 :             apply_handle_rollback_prepared(s);
    3372               5 :             break;
    3373                 : 
    3374              11 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    3375              11 :             apply_handle_stream_prepare(s);
    3376 GBC          11 :             break;
    3377 EUB             : 
    3378 UBC           0 :         default:
    3379 UIC           0 :             ereport(ERROR,
    3380                 :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    3381                 :                      errmsg("invalid logical replication message type \"%c\"", action)));
    3382 ECB             :     }
    3383                 : 
    3384                 :     /* Reset the current command */
    3385 CBC      336666 :     apply_error_callback_arg.command = saved_command;
    3386 GIC      336666 : }
    3387                 : 
    3388                 : /*
    3389                 :  * Figure out which write/flush positions to report to the walsender process.
    3390                 :  *
    3391 ECB             :  * We can't simply report back the last LSN the walsender sent us because the
    3392                 :  * local transaction might not yet be flushed to disk locally. Instead we
    3393                 :  * build a list that associates local with remote LSNs for every commit. When
    3394                 :  * reporting back the flush position to the sender we iterate that list and
    3395                 :  * check which entries on it are already locally flushed. Those we can report
    3396                 :  * as having been flushed.
    3397                 :  *
    3398                 :  * The have_pending_txes is true if there are outstanding transactions that
    3399                 :  * need to be flushed.
    3400                 :  */
    3401                 : static void
    3402 GIC       58743 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
    3403                 :                    bool *have_pending_txes)
    3404                 : {
    3405                 :     dlist_mutable_iter iter;
    3406           58743 :     XLogRecPtr  local_flush = GetFlushRecPtr(NULL);
    3407                 : 
    3408           58743 :     *write = InvalidXLogRecPtr;
    3409           58743 :     *flush = InvalidXLogRecPtr;
    3410 ECB             : 
    3411 CBC       59146 :     dlist_foreach_modify(iter, &lsn_mapping)
    3412                 :     {
    3413            7251 :         FlushPosition *pos =
    3414            7251 :         dlist_container(FlushPosition, node, iter.cur);
    3415                 : 
    3416            7251 :         *write = pos->remote_end;
    3417                 : 
    3418            7251 :         if (pos->local_end <= local_flush)
    3419 ECB             :         {
    3420 CBC         403 :             *flush = pos->remote_end;
    3421             403 :             dlist_delete(iter.cur);
    3422 GIC         403 :             pfree(pos);
    3423                 :         }
    3424                 :         else
    3425                 :         {
    3426                 :             /*
    3427 ECB             :              * Don't want to uselessly iterate over the rest of the list which
    3428                 :              * could potentially be long. Instead get the last element and
    3429                 :              * grab the write position from there.
    3430                 :              */
    3431 GIC        6848 :             pos = dlist_tail_element(FlushPosition, node,
    3432                 :                                      &lsn_mapping);
    3433 CBC        6848 :             *write = pos->remote_end;
    3434            6848 :             *have_pending_txes = true;
    3435 GIC        6848 :             return;
    3436                 :         }
    3437 ECB             :     }
    3438                 : 
    3439 GIC       51895 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
    3440 ECB             : }
    3441                 : 
    3442                 : /*
    3443                 :  * Store current remote/local lsn pair in the tracking list.
    3444                 :  */
    3445                 : void
    3446 GNC         459 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
    3447                 : {
    3448 ECB             :     FlushPosition *flushpos;
    3449                 : 
    3450                 :     /*
    3451                 :      * Skip for parallel apply workers, because the lsn_mapping is maintained
    3452                 :      * by the leader apply worker.
    3453                 :      */
    3454 GNC         459 :     if (am_parallel_apply_worker())
    3455              19 :         return;
    3456                 : 
    3457 ECB             :     /* Need to do this in permanent context */
    3458 GIC         440 :     MemoryContextSwitchTo(ApplyContext);
    3459                 : 
    3460                 :     /* Track commit lsn  */
    3461             440 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
    3462 GNC         440 :     flushpos->local_end = local_lsn;
    3463 GIC         440 :     flushpos->remote_end = remote_lsn;
    3464                 : 
    3465             440 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
    3466 CBC         440 :     MemoryContextSwitchTo(ApplyMessageContext);
    3467                 : }
    3468                 : 
    3469                 : 
    3470                 : /* Update statistics of the worker. */
    3471                 : static void
    3472          185649 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    3473 ECB             : {
    3474 CBC      185649 :     MyLogicalRepWorker->last_lsn = last_lsn;
    3475 GIC      185649 :     MyLogicalRepWorker->last_send_time = send_time;
    3476          185649 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
    3477          185649 :     if (reply)
    3478                 :     {
    3479            1295 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    3480 CBC        1295 :         MyLogicalRepWorker->reply_time = send_time;
    3481 ECB             :     }
    3482 GIC      185649 : }
    3483 ECB             : 
    3484                 : /*
    3485                 :  * Apply main loop.
    3486                 :  */
    3487                 : static void
    3488 GIC         282 : LogicalRepApplyLoop(XLogRecPtr last_received)
    3489                 : {
    3490             282 :     TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    3491             282 :     bool        ping_sent = false;
    3492                 :     TimeLineID  tli;
    3493                 :     ErrorContextCallback errcallback;
    3494 ECB             : 
    3495                 :     /*
    3496                 :      * Init the ApplyMessageContext which we clean up after each replication
    3497                 :      * protocol message.
    3498                 :      */
    3499 CBC         282 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    3500                 :                                                 "ApplyMessageContext",
    3501 ECB             :                                                 ALLOCSET_DEFAULT_SIZES);
    3502                 : 
    3503                 :     /*
    3504                 :      * This memory context is used for per-stream data when the streaming mode
    3505                 :      * is enabled. This context is reset on each stream stop.
    3506                 :      */
    3507 GIC         282 :     LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
    3508                 :                                                     "LogicalStreamingContext",
    3509                 :                                                     ALLOCSET_DEFAULT_SIZES);
    3510                 : 
    3511                 :     /* mark as idle, before starting to loop */
    3512             282 :     pgstat_report_activity(STATE_IDLE, NULL);
    3513                 : 
    3514                 :     /*
    3515                 :      * Push apply error context callback. Fields will be filled while applying
    3516 ECB             :      * a change.
    3517                 :      */
    3518 GIC         282 :     errcallback.callback = apply_error_callback;
    3519             282 :     errcallback.previous = error_context_stack;
    3520             282 :     error_context_stack = &errcallback;
    3521 GNC         282 :     apply_error_context_stack = error_context_stack;
    3522                 : 
    3523                 :     /* This outer loop iterates once per wait. */
    3524 ECB             :     for (;;)
    3525 CBC       57102 :     {
    3526           57384 :         pgsocket    fd = PGINVALID_SOCKET;
    3527                 :         int         rc;
    3528                 :         int         len;
    3529 GIC       57384 :         char       *buf = NULL;
    3530           57384 :         bool        endofstream = false;
    3531                 :         long        wait_time;
    3532                 : 
    3533           57384 :         CHECK_FOR_INTERRUPTS();
    3534 ECB             : 
    3535 GIC       57384 :         MemoryContextSwitchTo(ApplyMessageContext);
    3536                 : 
    3537           57384 :         len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    3538                 : 
    3539           57371 :         if (len != 0)
    3540                 :         {
    3541                 :             /* Loop to process all available data (without blocking). */
    3542                 :             for (;;)
    3543                 :             {
    3544          242244 :                 CHECK_FOR_INTERRUPTS();
    3545                 : 
    3546          242244 :                 if (len == 0)
    3547                 :                 {
    3548           56592 :                     break;
    3549                 :                 }
    3550 CBC      185652 :                 else if (len < 0)
    3551 ECB             :                 {
    3552 CBC           3 :                     ereport(LOG,
    3553                 :                             (errmsg("data stream from publisher has ended")));
    3554               3 :                     endofstream = true;
    3555 GIC           3 :                     break;
    3556 ECB             :                 }
    3557                 :                 else
    3558                 :                 {
    3559                 :                     int         c;
    3560                 :                     StringInfoData s;
    3561                 : 
    3562                 :                     /* Reset timeout. */
    3563 GIC      185649 :                     last_recv_timestamp = GetCurrentTimestamp();
    3564 GBC      185649 :                     ping_sent = false;
    3565 EUB             : 
    3566                 :                     /* Ensure we are reading the data into our memory context. */
    3567 GIC      185649 :                     MemoryContextSwitchTo(ApplyMessageContext);
    3568                 : 
    3569          185649 :                     s.data = buf;
    3570 CBC      185649 :                     s.len = len;
    3571 GIC      185649 :                     s.cursor = 0;
    3572          185649 :                     s.maxlen = -1;
    3573 ECB             : 
    3574 GIC      185649 :                     c = pq_getmsgbyte(&s);
    3575                 : 
    3576          185649 :                     if (c == 'w')
    3577                 :                     {
    3578                 :                         XLogRecPtr  start_lsn;
    3579 ECB             :                         XLogRecPtr  end_lsn;
    3580                 :                         TimestampTz send_time;
    3581                 : 
    3582 GIC      184354 :                         start_lsn = pq_getmsgint64(&s);
    3583          184354 :                         end_lsn = pq_getmsgint64(&s);
    3584 CBC      184354 :                         send_time = pq_getmsgint64(&s);
    3585 ECB             : 
    3586 CBC      184354 :                         if (last_received < start_lsn)
    3587          148772 :                             last_received = start_lsn;
    3588                 : 
    3589 GIC      184354 :                         if (last_received < end_lsn)
    3590 UIC           0 :                             last_received = end_lsn;
    3591 ECB             : 
    3592 CBC      184354 :                         UpdateWorkerStats(last_received, send_time, false);
    3593 ECB             : 
    3594 GIC      184354 :                         apply_dispatch(&s);
    3595                 :                     }
    3596 CBC        1295 :                     else if (c == 'k')
    3597 ECB             :                     {
    3598                 :                         XLogRecPtr  end_lsn;
    3599                 :                         TimestampTz timestamp;
    3600                 :                         bool        reply_requested;
    3601                 : 
    3602 GIC        1295 :                         end_lsn = pq_getmsgint64(&s);
    3603 CBC        1295 :                         timestamp = pq_getmsgint64(&s);
    3604 GIC        1295 :                         reply_requested = pq_getmsgbyte(&s);
    3605                 : 
    3606 CBC        1295 :                         if (last_received < end_lsn)
    3607 GIC         678 :                             last_received = end_lsn;
    3608 ECB             : 
    3609 CBC        1295 :                         send_feedback(last_received, reply_requested, false);
    3610 GIC        1295 :                         UpdateWorkerStats(last_received, timestamp, true);
    3611 ECB             :                     }
    3612                 :                     /* other message types are purposefully ignored */
    3613                 : 
    3614 GIC      185621 :                     MemoryContextReset(ApplyMessageContext);
    3615                 :                 }
    3616                 : 
    3617          185621 :                 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    3618                 :             }
    3619                 :         }
    3620                 : 
    3621                 :         /* confirm all writes so far */
    3622 CBC       57343 :         send_feedback(last_received, false, false);
    3623                 : 
    3624 GIC       57343 :         if (!in_remote_transaction && !in_streamed_transaction)
    3625                 :         {
    3626                 :             /*
    3627 ECB             :              * If we didn't get any transactions for a while there might be
    3628                 :              * unconsumed invalidation messages in the queue, consume them
    3629                 :              * now.
    3630                 :              */
    3631 GIC        2237 :             AcceptInvalidationMessages();
    3632            2237 :             maybe_reread_subscription();
    3633                 : 
    3634 ECB             :             /* Process any table synchronization changes. */
    3635 CBC        2206 :             process_syncing_tables(last_received);
    3636                 :         }
    3637 ECB             : 
    3638                 :         /* Cleanup the memory. */
    3639 GIC       57158 :         MemoryContextResetAndDeleteChildren(ApplyMessageContext);
    3640           57158 :         MemoryContextSwitchTo(TopMemoryContext);
    3641 ECB             : 
    3642                 :         /* Check if we need to exit the streaming loop. */
    3643 CBC       57158 :         if (endofstream)
    3644 GIC           3 :             break;
    3645                 : 
    3646 ECB             :         /*
    3647                 :          * Wait for more data or latch.  If we have unflushed transactions,
    3648                 :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    3649                 :          * which case we should send a feedback message).  Otherwise, there's
    3650                 :          * no particular urgency about waking up unless we get data or a
    3651                 :          * signal.
    3652                 :          */
    3653 GIC       57155 :         if (!dlist_is_empty(&lsn_mapping))
    3654            6346 :             wait_time = WalWriterDelay;
    3655                 :         else
    3656           50809 :             wait_time = NAPTIME_PER_CYCLE;
    3657 ECB             : 
    3658 GIC       57155 :         rc = WaitLatchOrSocket(MyLatch,
    3659                 :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    3660                 :                                WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    3661                 :                                fd, wait_time,
    3662                 :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    3663                 : 
    3664 CBC       57155 :         if (rc & WL_LATCH_SET)
    3665 ECB             :         {
    3666 CBC         431 :             ResetLatch(MyLatch);
    3667 GIC         431 :             CHECK_FOR_INTERRUPTS();
    3668                 :         }
    3669                 : 
    3670           57102 :         if (ConfigReloadPending)
    3671                 :         {
    3672              13 :             ConfigReloadPending = false;
    3673              13 :             ProcessConfigFile(PGC_SIGHUP);
    3674                 :         }
    3675                 : 
    3676 CBC       57102 :         if (rc & WL_TIMEOUT)
    3677                 :         {
    3678                 :             /*
    3679                 :              * We didn't receive anything new. If we haven't heard anything
    3680                 :              * from the server for more than wal_receiver_timeout / 2, ping
    3681                 :              * the server. Also, if it's been longer than
    3682                 :              * wal_receiver_status_interval since the last update we sent,
    3683                 :              * send a status update to the primary anyway, to report any
    3684                 :              * progress in applying WAL.
    3685                 :              */
    3686 GIC         105 :             bool        requestReply = false;
    3687                 : 
    3688 ECB             :             /*
    3689                 :              * Check if time since last receive from primary has reached the
    3690                 :              * configured limit.
    3691                 :              */
    3692 CBC         105 :             if (wal_receiver_timeout > 0)
    3693                 :             {
    3694 GIC         105 :                 TimestampTz now = GetCurrentTimestamp();
    3695 ECB             :                 TimestampTz timeout;
    3696                 : 
    3697 GIC         105 :                 timeout =
    3698             105 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    3699                 :                                                 wal_receiver_timeout);
    3700 ECB             : 
    3701 GIC         105 :                 if (now >= timeout)
    3702 UIC           0 :                     ereport(ERROR,
    3703 ECB             :                             (errcode(ERRCODE_CONNECTION_FAILURE),
    3704                 :                              errmsg("terminating logical replication worker due to timeout")));
    3705                 : 
    3706                 :                 /* Check to see if it's time for a ping. */
    3707 GIC         105 :                 if (!ping_sent)
    3708                 :                 {
    3709             105 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    3710 ECB             :                                                           (wal_receiver_timeout / 2));
    3711 GIC         105 :                     if (now >= timeout)
    3712                 :                     {
    3713 UIC           0 :                         requestReply = true;
    3714               0 :                         ping_sent = true;
    3715 ECB             :                     }
    3716                 :                 }
    3717                 :             }
    3718                 : 
    3719 GIC         105 :             send_feedback(last_received, requestReply, requestReply);
    3720                 : 
    3721                 :             /*
    3722                 :              * Force reporting to ensure long idle periods don't lead to
    3723                 :              * arbitrarily delayed stats. Stats can only be reported outside
    3724                 :              * of (implicit or explicit) transactions. That shouldn't lead to
    3725                 :              * stats being delayed for long, because transactions are either
    3726 ECB             :              * sent as a whole on commit or streamed. Streamed transactions
    3727                 :              * are spilled to disk and applied on commit.
    3728                 :              */
    3729 GIC         105 :             if (!IsTransactionState())
    3730 CBC         105 :                 pgstat_report_stat(true);
    3731 ECB             :         }
    3732                 :     }
    3733                 : 
    3734                 :     /* Pop the error context stack */
    3735 GIC           3 :     error_context_stack = errcallback.previous;
    3736 GNC           3 :     apply_error_context_stack = error_context_stack;
    3737                 : 
    3738 ECB             :     /* All done */
    3739 GIC           3 :     walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
    3740 UIC           0 : }
    3741                 : 
    3742                 : /*
    3743 ECB             :  * Send a Standby Status Update message to server.
    3744                 :  *
    3745                 :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    3746                 :  * to send a response to avoid timeouts.
    3747                 :  */
    3748                 : static void
    3749 GIC       58743 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    3750                 : {
    3751                 :     static StringInfo reply_message = NULL;
    3752                 :     static TimestampTz send_time = 0;
    3753                 : 
    3754                 :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    3755 ECB             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    3756                 :     static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
    3757                 : 
    3758                 :     XLogRecPtr  writepos;
    3759                 :     XLogRecPtr  flushpos;
    3760                 :     TimestampTz now;
    3761                 :     bool        have_pending_txes;
    3762                 : 
    3763                 :     /*
    3764                 :      * If the user doesn't want status to be reported to the publisher, be
    3765                 :      * sure to exit before doing anything at all.
    3766                 :      */
    3767 CBC       58743 :     if (!force && wal_receiver_status_interval <= 0)
    3768           15273 :         return;
    3769                 : 
    3770 ECB             :     /* It's legal to not pass a recvpos */
    3771 CBC       58743 :     if (recvpos < last_recvpos)
    3772 UIC           0 :         recvpos = last_recvpos;
    3773                 : 
    3774 GIC       58743 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    3775                 : 
    3776 ECB             :     /*
    3777                 :      * No outstanding transactions to flush, we can report the latest received
    3778                 :      * position. This is important for synchronous replication.
    3779                 :      */
    3780 GIC       58743 :     if (!have_pending_txes)
    3781           51895 :         flushpos = writepos = recvpos;
    3782 ECB             : 
    3783 GIC       58743 :     if (writepos < last_writepos)
    3784 LBC           0 :         writepos = last_writepos;
    3785                 : 
    3786 CBC       58743 :     if (flushpos < last_flushpos)
    3787 GIC        6812 :         flushpos = last_flushpos;
    3788                 : 
    3789 CBC       58743 :     now = GetCurrentTimestamp();
    3790                 : 
    3791 ECB             :     /* if we've already reported everything we're good */
    3792 CBC       58743 :     if (!force &&
    3793 GIC       58742 :         writepos == last_writepos &&
    3794 CBC       15447 :         flushpos == last_flushpos &&
    3795 GIC       15350 :         !TimestampDifferenceExceeds(send_time, now,
    3796 ECB             :                                     wal_receiver_status_interval * 1000))
    3797 CBC       15273 :         return;
    3798 GIC       43470 :     send_time = now;
    3799                 : 
    3800 CBC       43470 :     if (!reply_message)
    3801                 :     {
    3802             282 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    3803                 : 
    3804 GIC         282 :         reply_message = makeStringInfo();
    3805             282 :         MemoryContextSwitchTo(oldctx);
    3806                 :     }
    3807                 :     else
    3808           43188 :         resetStringInfo(reply_message);
    3809                 : 
    3810           43470 :     pq_sendbyte(reply_message, 'r');
    3811           43470 :     pq_sendint64(reply_message, recvpos);   /* write */
    3812           43470 :     pq_sendint64(reply_message, flushpos);  /* flush */
    3813           43470 :     pq_sendint64(reply_message, writepos);  /* apply */
    3814           43470 :     pq_sendint64(reply_message, now);   /* sendTime */
    3815           43470 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    3816                 : 
    3817 CBC       43470 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
    3818                 :          force,
    3819                 :          LSN_FORMAT_ARGS(recvpos),
    3820                 :          LSN_FORMAT_ARGS(writepos),
    3821 ECB             :          LSN_FORMAT_ARGS(flushpos));
    3822                 : 
    3823 GIC       43470 :     walrcv_send(LogRepWorkerWalRcvConn,
    3824                 :                 reply_message->data, reply_message->len);
    3825                 : 
    3826           43470 :     if (recvpos > last_recvpos)
    3827           43295 :         last_recvpos = recvpos;
    3828           43470 :     if (writepos > last_writepos)
    3829 CBC       43295 :         last_writepos = writepos;
    3830 GIC       43470 :     if (flushpos > last_flushpos)
    3831           43070 :         last_flushpos = flushpos;
    3832                 : }
    3833 ECB             : 
    3834                 : /*
    3835                 :  * Exit routine for apply workers due to subscription parameter changes.
    3836                 :  */
    3837                 : static void
    3838 GNC          31 : apply_worker_exit(void)
    3839                 : {
    3840              31 :     if (am_parallel_apply_worker())
    3841                 :     {
    3842                 :         /*
    3843                 :          * Don't stop the parallel apply worker as the leader will detect the
    3844                 :          * subscription parameter change and restart logical replication later
    3845                 :          * anyway. This also prevents the leader from reporting errors when
    3846                 :          * trying to communicate with a stopped parallel apply worker, which
    3847                 :          * would accidentally disable subscriptions if disable_on_error was
    3848                 :          * set.
    3849                 :          */
    3850 UNC           0 :         return;
    3851                 :     }
    3852                 : 
    3853                 :     /*
    3854                 :      * Reset the last-start time for this apply worker so that the launcher
    3855                 :      * will restart it without waiting for wal_retrieve_retry_interval if the
    3856                 :      * subscription is still active, and so that we won't leak that hash table
    3857                 :      * entry if it isn't.
    3858                 :      */
    3859 GNC          31 :     if (!am_tablesync_worker())
    3860              31 :         ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    3861                 : 
    3862              31 :     proc_exit(0);
    3863                 : }
    3864                 : 
    3865                 : /*
    3866                 :  * Reread subscription info if needed. Most changes will be exit.
    3867                 :  */
    3868                 : void
    3869 GIC        3141 : maybe_reread_subscription(void)
    3870                 : {
    3871 ECB             :     MemoryContext oldctx;
    3872                 :     Subscription *newsub;
    3873 GIC        3141 :     bool        started_tx = false;
    3874 ECB             : 
    3875                 :     /* When cache state is valid there is nothing to do here. */
    3876 GIC        3141 :     if (MySubscriptionValid)
    3877            3084 :         return;
    3878                 : 
    3879                 :     /* This function might be called inside or outside of transaction. */
    3880 CBC          57 :     if (!IsTransactionState())
    3881 ECB             :     {
    3882 GIC          55 :         StartTransactionCommand();
    3883 CBC          55 :         started_tx = true;
    3884                 :     }
    3885                 : 
    3886                 :     /* Ensure allocations in permanent context. */
    3887 GIC          57 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    3888                 : 
    3889              57 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    3890                 : 
    3891                 :     /*
    3892                 :      * Exit if the subscription was removed. This normally should not happen
    3893 ECB             :      * as the worker gets killed during DROP SUBSCRIPTION.
    3894                 :      */
    3895 GIC          57 :     if (!newsub)
    3896 ECB             :     {
    3897 LBC           0 :         ereport(LOG,
    3898                 :         /* translator: first %s is the name of logical replication worker */
    3899                 :                 (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
    3900                 :                         get_worker_name(), MySubscription->name)));
    3901 ECB             : 
    3902                 :         /* Ensure we remove no-longer-useful entry for worker's start time */
    3903 UNC           0 :         if (!am_tablesync_worker() && !am_parallel_apply_worker())
    3904               0 :             ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    3905 LBC           0 :         proc_exit(0);
    3906                 :     }
    3907                 : 
    3908                 :     /* Exit if the subscription was disabled. */
    3909 GIC          57 :     if (!newsub->enabled)
    3910                 :     {
    3911               2 :         ereport(LOG,
    3912                 :         /* translator: first %s is the name of logical replication worker */
    3913                 :                 (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled",
    3914                 :                         get_worker_name(), MySubscription->name)));
    3915                 : 
    3916 GNC           2 :         apply_worker_exit();
    3917                 :     }
    3918 ECB             : 
    3919                 :     /* !slotname should never happen when enabled is true. */
    3920 GIC          55 :     Assert(newsub->slotname);
    3921 ECB             : 
    3922                 :     /* two-phase should not be altered */
    3923 GIC          55 :     Assert(newsub->twophasestate == MySubscription->twophasestate);
    3924                 : 
    3925 ECB             :     /*
    3926                 :      * Exit if any parameter that affects the remote connection was changed.
    3927                 :      * The launcher will start a new worker but note that the parallel apply
    3928                 :      * worker won't restart if the streaming option's value is changed from
    3929                 :      * 'parallel' to any other value or the server decides not to stream the
    3930                 :      * in-progress transaction.
    3931                 :      */
    3932 GIC          55 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
    3933 GBC          54 :         strcmp(newsub->name, MySubscription->name) != 0 ||
    3934              53 :         strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
    3935 GIC          53 :         newsub->binary != MySubscription->binary ||
    3936              47 :         newsub->stream != MySubscription->stream ||
    3937 GNC          42 :         strcmp(newsub->origin, MySubscription->origin) != 0 ||
    3938 GIC          42 :         newsub->owner != MySubscription->owner ||
    3939 CBC          42 :         !equal(newsub->publications, MySubscription->publications))
    3940 ECB             :     {
    3941 GNC          29 :         if (am_parallel_apply_worker())
    3942 UNC           0 :             ereport(LOG,
    3943                 :                     (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
    3944                 :                             MySubscription->name)));
    3945                 :         else
    3946 GNC          29 :             ereport(LOG,
    3947                 :             /* translator: first %s is the name of logical replication worker */
    3948                 :                     (errmsg("%s for subscription \"%s\" will restart because of a parameter change",
    3949                 :                             get_worker_name(), MySubscription->name)));
    3950 ECB             : 
    3951 GNC          29 :         apply_worker_exit();
    3952                 :     }
    3953                 : 
    3954 ECB             :     /* Check for other changes that should never happen too. */
    3955 CBC          26 :     if (newsub->dbid != MySubscription->dbid)
    3956 ECB             :     {
    3957 UIC           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    3958                 :              MyLogicalRepWorker->subid);
    3959 ECB             :     }
    3960                 : 
    3961                 :     /* Clean old subscription info and switch to new one. */
    3962 GIC          26 :     FreeSubscription(MySubscription);
    3963              26 :     MySubscription = newsub;
    3964                 : 
    3965              26 :     MemoryContextSwitchTo(oldctx);
    3966                 : 
    3967                 :     /* Change synchronous commit according to the user's wishes */
    3968              26 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    3969 ECB             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    3970                 : 
    3971 CBC          26 :     if (started_tx)
    3972              24 :         CommitTransactionCommand();
    3973                 : 
    3974              26 :     MySubscriptionValid = true;
    3975 ECB             : }
    3976                 : 
    3977 EUB             : /*
    3978                 :  * Callback from subscription syscache invalidation.
    3979                 :  */
    3980                 : static void
    3981 GIC          58 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    3982                 : {
    3983 CBC          58 :     MySubscriptionValid = false;
    3984 GIC          58 : }
    3985 ECB             : 
    3986                 : /*
    3987                 :  * subxact_info_write
    3988                 :  *    Store information about subxacts for a toplevel transaction.
    3989                 :  *
    3990                 :  * For each subxact we store offset of it's first change in the main file.
    3991                 :  * The file is always over-written as a whole.
    3992                 :  *
    3993                 :  * XXX We should only store subxacts that were not aborted yet.
    3994 EUB             :  */
    3995                 : static void
    3996 GIC         370 : subxact_info_write(Oid subid, TransactionId xid)
    3997                 : {
    3998                 :     char        path[MAXPGPATH];
    3999                 :     Size        len;
    4000                 :     BufFile    *fd;
    4001                 : 
    4002             370 :     Assert(TransactionIdIsValid(xid));
    4003                 : 
    4004                 :     /* construct the subxact filename */
    4005             370 :     subxact_filename(path, subid, xid);
    4006 ECB             : 
    4007                 :     /* Delete the subxacts file, if exists. */
    4008 CBC         370 :     if (subxact_data.nsubxacts == 0)
    4009 ECB             :     {
    4010 CBC         288 :         cleanup_subxact_info();
    4011             288 :         BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    4012 ECB             : 
    4013 CBC         288 :         return;
    4014 ECB             :     }
    4015                 : 
    4016                 :     /*
    4017                 :      * Create the subxact file if it not already created, otherwise open the
    4018                 :      * existing file.
    4019                 :      */
    4020 GIC          82 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
    4021                 :                             true);
    4022              82 :     if (fd == NULL)
    4023 CBC           8 :         fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
    4024 ECB             : 
    4025 GBC          82 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    4026                 : 
    4027 ECB             :     /* Write the subxact count and subxact info */
    4028 GIC          82 :     BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    4029 CBC          82 :     BufFileWrite(fd, subxact_data.subxacts, len);
    4030                 : 
    4031              82 :     BufFileClose(fd);
    4032                 : 
    4033 ECB             :     /* free the memory allocated for subxact info */
    4034 GIC          82 :     cleanup_subxact_info();
    4035                 : }
    4036 ECB             : 
    4037                 : /*
    4038                 :  * subxact_info_read
    4039                 :  *    Restore information about subxacts of a streamed transaction.
    4040                 :  *
    4041                 :  * Read information about subxacts into the structure subxact_data that can be
    4042                 :  * used later.
    4043 EUB             :  */
    4044                 : static void
    4045 GIC         342 : subxact_info_read(Oid subid, TransactionId xid)
    4046                 : {
    4047 ECB             :     char        path[MAXPGPATH];
    4048                 :     Size        len;
    4049                 :     BufFile    *fd;
    4050                 :     MemoryContext oldctx;
    4051                 : 
    4052 GIC         342 :     Assert(!subxact_data.subxacts);
    4053             342 :     Assert(subxact_data.nsubxacts == 0);
    4054             342 :     Assert(subxact_data.nsubxacts_max == 0);
    4055                 : 
    4056                 :     /*
    4057 ECB             :      * If the subxact file doesn't exist that means we don't have any subxact
    4058                 :      * info.
    4059                 :      */
    4060 CBC         342 :     subxact_filename(path, subid, xid);
    4061 GIC         342 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
    4062                 :                             true);
    4063             342 :     if (fd == NULL)
    4064 CBC         263 :         return;
    4065                 : 
    4066 ECB             :     /* read number of subxact items */
    4067 GNC          79 :     BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    4068 ECB             : 
    4069 GIC          79 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    4070                 : 
    4071                 :     /* we keep the maximum as a power of 2 */
    4072              79 :     subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
    4073                 : 
    4074 ECB             :     /*
    4075                 :      * Allocate subxact information in the logical streaming context. We need
    4076 EUB             :      * this information during the complete stream so that we can add the sub
    4077                 :      * transaction info to this. On stream stop we will flush this information
    4078                 :      * to the subxact file and reset the logical streaming context.
    4079                 :      */
    4080 CBC          79 :     oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    4081              79 :     subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
    4082 ECB             :                                    sizeof(SubXactInfo));
    4083 CBC          79 :     MemoryContextSwitchTo(oldctx);
    4084                 : 
    4085              79 :     if (len > 0)
    4086 GNC          79 :         BufFileReadExact(fd, subxact_data.subxacts, len);
    4087                 : 
    4088 GIC          79 :     BufFileClose(fd);
    4089                 : }
    4090                 : 
    4091                 : /*
    4092                 :  * subxact_info_add
    4093                 :  *    Add information about a subxact (offset in the main file).
    4094 ECB             :  */
    4095                 : static void
    4096 GIC      102513 : subxact_info_add(TransactionId xid)
    4097                 : {
    4098          102513 :     SubXactInfo *subxacts = subxact_data.subxacts;
    4099 ECB             :     int64       i;
    4100                 : 
    4101                 :     /* We must have a valid top level stream xid and a stream fd. */
    4102 CBC      102513 :     Assert(TransactionIdIsValid(stream_xid));
    4103 GIC      102513 :     Assert(stream_fd != NULL);
    4104 ECB             : 
    4105                 :     /*
    4106                 :      * If the XID matches the toplevel transaction, we don't want to add it.
    4107                 :      */
    4108 CBC      102513 :     if (stream_xid == xid)
    4109 GIC       92389 :         return;
    4110 ECB             : 
    4111                 :     /*
    4112                 :      * In most cases we're checking the same subxact as we've already seen in
    4113                 :      * the last call, so make sure to ignore it (this change comes later).
    4114                 :      */
    4115 GIC       10124 :     if (subxact_data.subxact_last == xid)
    4116           10048 :         return;
    4117                 : 
    4118                 :     /* OK, remember we're processing this XID. */
    4119              76 :     subxact_data.subxact_last = xid;
    4120                 : 
    4121 ECB             :     /*
    4122                 :      * Check if the transaction is already present in the array of subxact. We
    4123                 :      * intentionally scan the array from the tail, because we're likely adding
    4124                 :      * a change for the most recent subtransactions.
    4125                 :      *
    4126                 :      * XXX Can we rely on the subxact XIDs arriving in sorted order? That
    4127                 :      * would allow us to use binary search here.
    4128                 :      */
    4129 GIC          95 :     for (i = subxact_data.nsubxacts; i > 0; i--)
    4130                 :     {
    4131 ECB             :         /* found, so we're done */
    4132 CBC          76 :         if (subxacts[i - 1].xid == xid)
    4133 GIC          57 :             return;
    4134 ECB             :     }
    4135                 : 
    4136                 :     /* This is a new subxact, so we need to add it to the array. */
    4137 CBC          19 :     if (subxact_data.nsubxacts == 0)
    4138 ECB             :     {
    4139                 :         MemoryContext oldctx;
    4140                 : 
    4141 CBC           8 :         subxact_data.nsubxacts_max = 128;
    4142 ECB             : 
    4143                 :         /*
    4144                 :          * Allocate this memory for subxacts in per-stream context, see
    4145                 :          * subxact_info_read.
    4146                 :          */
    4147 GIC           8 :         oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    4148 CBC           8 :         subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    4149               8 :         MemoryContextSwitchTo(oldctx);
    4150 ECB             :     }
    4151 GIC          11 :     else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
    4152 ECB             :     {
    4153 CBC          10 :         subxact_data.nsubxacts_max *= 2;
    4154              10 :         subxacts = repalloc(subxacts,
    4155 GIC          10 :                             subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    4156 ECB             :     }
    4157                 : 
    4158 CBC          19 :     subxacts[subxact_data.nsubxacts].xid = xid;
    4159                 : 
    4160 ECB             :     /*
    4161                 :      * Get the current offset of the stream file and store it as offset of
    4162                 :      * this subxact.
    4163                 :      */
    4164 CBC          19 :     BufFileTell(stream_fd,
    4165              19 :                 &subxacts[subxact_data.nsubxacts].fileno,
    4166              19 :                 &subxacts[subxact_data.nsubxacts].offset);
    4167                 : 
    4168              19 :     subxact_data.nsubxacts++;
    4169              19 :     subxact_data.subxacts = subxacts;
    4170 ECB             : }
    4171                 : 
    4172 EUB             : /* format filename for file containing the info about subxacts */
    4173                 : static inline void
    4174 GIC         743 : subxact_filename(char *path, Oid subid, TransactionId xid)
    4175                 : {
    4176             743 :     snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
    4177             743 : }
    4178                 : 
    4179 EUB             : /* format filename for file containing serialized changes */
    4180                 : static inline void
    4181 CBC         436 : changes_filename(char *path, Oid subid, TransactionId xid)
    4182 ECB             : {
    4183 CBC         436 :     snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
    4184 GIC         436 : }
    4185 ECB             : 
    4186                 : /*
    4187                 :  * stream_cleanup_files
    4188                 :  *    Cleanup files for a subscription / toplevel transaction.
    4189                 :  *
    4190                 :  * Remove files with serialized changes and subxact info for a particular
    4191                 :  * toplevel transaction. Each subscription has a separate set of files
    4192                 :  * for any toplevel transaction.
    4193                 :  */
    4194                 : void
    4195 CBC          31 : stream_cleanup_files(Oid subid, TransactionId xid)
    4196                 : {
    4197 ECB             :     char        path[MAXPGPATH];
    4198                 : 
    4199                 :     /* Delete the changes file. */
    4200 GIC          31 :     changes_filename(path, subid, xid);
    4201 CBC          31 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
    4202 ECB             : 
    4203                 :     /* Delete the subxact file, if it exists. */
    4204 GIC          31 :     subxact_filename(path, subid, xid);
    4205 CBC          31 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    4206              31 : }
    4207 ECB             : 
    4208                 : /*
    4209                 :  * stream_open_file
    4210                 :  *    Open a file that we'll use to serialize changes for a toplevel
    4211                 :  * transaction.
    4212                 :  *
    4213                 :  * Open a file for streamed changes from a toplevel transaction identified
    4214                 :  * by stream_xid (global variable). If it's the first chunk of streamed
    4215                 :  * changes for this transaction, create the buffile, otherwise open the
    4216                 :  * previously created file.
    4217                 :  */
    4218                 : static void
    4219 GIC         361 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    4220                 : {
    4221 ECB             :     char        path[MAXPGPATH];
    4222                 :     MemoryContext oldcxt;
    4223                 : 
    4224 GIC         361 :     Assert(OidIsValid(subid));
    4225             361 :     Assert(TransactionIdIsValid(xid));
    4226             361 :     Assert(stream_fd == NULL);
    4227                 : 
    4228                 : 
    4229             361 :     changes_filename(path, subid, xid);
    4230             361 :     elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
    4231                 : 
    4232                 :     /*
    4233                 :      * Create/open the buffiles under the logical streaming context so that we
    4234                 :      * have those files until stream stop.
    4235                 :      */
    4236             361 :     oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
    4237 ECB             : 
    4238                 :     /*
    4239                 :      * If this is the first streamed segment, create the changes file.
    4240                 :      * Otherwise, just open the file for writing, in append mode.
    4241                 :      */
    4242 GIC         361 :     if (first_segment)
    4243 CBC          32 :         stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
    4244 ECB             :                                          path);
    4245                 :     else
    4246                 :     {
    4247                 :         /*
    4248                 :          * Open the file and seek to the end of the file because we always
    4249                 :          * append the changes file.
    4250                 :          */
    4251 CBC         329 :         stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
    4252                 :                                        path, O_RDWR, false);
    4253             329 :         BufFileSeek(stream_fd, 0, 0, SEEK_END);
    4254                 :     }
    4255 ECB             : 
    4256 CBC         361 :     MemoryContextSwitchTo(oldcxt);
    4257             361 : }
    4258                 : 
    4259                 : /*
    4260                 :  * stream_close_file
    4261                 :  *    Close the currently open file with streamed changes.
    4262                 :  */
    4263 ECB             : static void
    4264 GIC         391 : stream_close_file(void)
    4265 ECB             : {
    4266 GIC         391 :     Assert(stream_fd != NULL);
    4267                 : 
    4268             391 :     BufFileClose(stream_fd);
    4269 ECB             : 
    4270 GIC         391 :     stream_fd = NULL;
    4271             391 : }
    4272                 : 
    4273                 : /*
    4274                 :  * stream_write_change
    4275 ECB             :  *    Serialize a change to a file for the current toplevel transaction.
    4276                 :  *
    4277                 :  * The change is serialized in a simple format, with length (not including
    4278                 :  * the length), action code (identifying the message type) and message
    4279                 :  * contents (without the subxact TransactionId value).
    4280                 :  */
    4281                 : static void
    4282 GIC      107554 : stream_write_change(char action, StringInfo s)
    4283 ECB             : {
    4284                 :     int         len;
    4285                 : 
    4286 GIC      107554 :     Assert(stream_fd != NULL);
    4287                 : 
    4288 ECB             :     /* total on-disk size, including the action type character */
    4289 CBC      107554 :     len = (s->len - s->cursor) + sizeof(char);
    4290 ECB             : 
    4291                 :     /* first write the size */
    4292 CBC      107554 :     BufFileWrite(stream_fd, &len, sizeof(len));
    4293 ECB             : 
    4294                 :     /* then the action */
    4295 GIC      107554 :     BufFileWrite(stream_fd, &action, sizeof(action));
    4296                 : 
    4297                 :     /* and finally the remaining part of the buffer (after the XID) */
    4298          107554 :     len = (s->len - s->cursor);
    4299 ECB             : 
    4300 GIC      107554 :     BufFileWrite(stream_fd, &s->data[s->cursor], len);
    4301 CBC      107554 : }
    4302 ECB             : 
    4303                 : /*
    4304                 :  * stream_open_and_write_change
    4305                 :  *    Serialize a message to a file for the given transaction.
    4306                 :  *
    4307                 :  * This function is similar to stream_write_change except that it will open the
    4308                 :  * target file if not already before writing the message and close the file at
    4309                 :  * the end.
    4310                 :  */
    4311                 : static void
    4312 GNC           5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
    4313                 : {
    4314               5 :     Assert(!in_streamed_transaction);
    4315                 : 
    4316               5 :     if (!stream_fd)
    4317               5 :         stream_start_internal(xid, false);
    4318                 : 
    4319               5 :     stream_write_change(action, s);
    4320               5 :     stream_stop_internal(xid);
    4321               5 : }
    4322                 : 
    4323 ECB             : /*
    4324                 :  * Cleanup the memory for subxacts and reset the related variables.
    4325                 :  */
    4326                 : static inline void
    4327 CBC         374 : cleanup_subxact_info()
    4328                 : {
    4329             374 :     if (subxact_data.subxacts)
    4330 GIC          87 :         pfree(subxact_data.subxacts);
    4331                 : 
    4332             374 :     subxact_data.subxacts = NULL;
    4333             374 :     subxact_data.subxact_last = InvalidTransactionId;
    4334             374 :     subxact_data.nsubxacts = 0;
    4335 CBC         374 :     subxact_data.nsubxacts_max = 0;
    4336 GIC         374 : }
    4337 ECB             : 
    4338                 : /*
    4339                 :  * Form the prepared transaction GID for two_phase transactions.
    4340                 :  *
    4341                 :  * Return the GID in the supplied buffer.
    4342                 :  */
    4343                 : static void
    4344 GIC          45 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
    4345                 : {
    4346 CBC          45 :     Assert(subid != InvalidRepOriginId);
    4347                 : 
    4348 GIC          45 :     if (!TransactionIdIsValid(xid))
    4349 UIC           0 :         ereport(ERROR,
    4350                 :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    4351                 :                  errmsg_internal("invalid two-phase transaction ID")));
    4352                 : 
    4353 GIC          45 :     snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
    4354 CBC          45 : }
    4355                 : 
    4356                 : /*
    4357                 :  * Execute the initial sync with error handling. Disable the subscription,
    4358                 :  * if it's required.
    4359 ECB             :  *
    4360                 :  * Allocate the slot name in long-lived context on return. Note that we don't
    4361                 :  * handle FATAL errors which are probably because of system resource error and
    4362                 :  * are not repeatable.
    4363                 :  */
    4364                 : static void
    4365 CBC         156 : start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
    4366 ECB             : {
    4367 CBC         156 :     char       *syncslotname = NULL;
    4368 ECB             : 
    4369 GIC         156 :     Assert(am_tablesync_worker());
    4370                 : 
    4371             156 :     PG_TRY();
    4372 ECB             :     {
    4373                 :         /* Call initial sync. */
    4374 GIC         156 :         syncslotname = LogicalRepSyncTableStart(origin_startpos);
    4375                 :     }
    4376 CBC           7 :     PG_CATCH();
    4377 ECB             :     {
    4378 GIC           7 :         if (MySubscription->disableonerr)
    4379               1 :             DisableSubscriptionAndExit();
    4380 ECB             :         else
    4381                 :         {
    4382                 :             /*
    4383                 :              * Report the worker failed during table synchronization. Abort
    4384                 :              * the current transaction so that the stats message is sent in an
    4385                 :              * idle state.
    4386                 :              */
    4387 GIC           6 :             AbortOutOfAnyTransaction();
    4388               6 :             pgstat_report_subscription_error(MySubscription->oid, false);
    4389                 : 
    4390               6 :             PG_RE_THROW();
    4391 ECB             :         }
    4392                 :     }
    4393 CBC         149 :     PG_END_TRY();
    4394                 : 
    4395 ECB             :     /* allocate slot name in long-lived context */
    4396 GIC         149 :     *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
    4397 CBC         149 :     pfree(syncslotname);
    4398 GIC         149 : }
    4399 ECB             : 
    4400                 : /*
    4401                 :  * Run the apply loop with error handling. Disable the subscription,
    4402                 :  * if necessary.
    4403                 :  *
    4404                 :  * Note that we don't handle FATAL errors which are probably because
    4405                 :  * of system resource error and are not repeatable.
    4406                 :  */
    4407                 : static void
    4408 GIC         282 : start_apply(XLogRecPtr origin_startpos)
    4409                 : {
    4410 CBC         282 :     PG_TRY();
    4411 ECB             :     {
    4412 GIC         282 :         LogicalRepApplyLoop(origin_startpos);
    4413                 :     }
    4414 CBC          45 :     PG_CATCH();
    4415                 :     {
    4416              45 :         if (MySubscription->disableonerr)
    4417               3 :             DisableSubscriptionAndExit();
    4418 ECB             :         else
    4419                 :         {
    4420                 :             /*
    4421                 :              * Report the worker failed while applying changes. Abort the
    4422                 :              * current transaction so that the stats message is sent in an
    4423                 :              * idle state.
    4424                 :              */
    4425 GIC          42 :             AbortOutOfAnyTransaction();
    4426              42 :             pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
    4427                 : 
    4428              42 :             PG_RE_THROW();
    4429 ECB             :         }
    4430                 :     }
    4431 LBC           0 :     PG_END_TRY();
    4432 UIC           0 : }
    4433 ECB             : 
    4434                 : /*
    4435                 :  * Common initialization for leader apply worker and parallel apply worker.
    4436                 :  *
    4437                 :  * Initialize the database connection, in-memory subscription and necessary
    4438                 :  * config options.
    4439                 :  */
    4440                 : void
    4441 GNC         315 : InitializeApplyWorker(void)
    4442 EUB             : {
    4443 ECB             :     MemoryContext oldctx;
    4444                 : 
    4445                 :     /* Run as replica session replication role. */
    4446 GIC         315 :     SetConfigOption("session_replication_role", "replica",
    4447                 :                     PGC_SUSET, PGC_S_OVERRIDE);
    4448 ECB             : 
    4449                 :     /* Connect to our database. */
    4450 CBC         315 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    4451 GIC         315 :                                               MyLogicalRepWorker->userid,
    4452                 :                                               0);
    4453                 : 
    4454                 :     /*
    4455                 :      * Set always-secure search path, so malicious users can't redirect user
    4456                 :      * code (e.g. pg_index.indexprs).
    4457 ECB             :      */
    4458 CBC         312 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    4459                 : 
    4460                 :     /* Load the subscription into persistent memory context. */
    4461             312 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    4462                 :                                          "ApplyContext",
    4463                 :                                          ALLOCSET_DEFAULT_SIZES);
    4464 GIC         312 :     StartTransactionCommand();
    4465 CBC         312 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    4466 ECB             : 
    4467 GIC         312 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
    4468             312 :     if (!MySubscription)
    4469 ECB             :     {
    4470 LBC           0 :         ereport(LOG,
    4471                 :         /* translator: %s is the name of logical replication worker */
    4472                 :                 (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
    4473                 :                         get_worker_name(), MyLogicalRepWorker->subid)));
    4474                 : 
    4475                 :         /* Ensure we remove no-longer-useful entry for worker's start time */
    4476 UNC           0 :         if (!am_tablesync_worker() && !am_parallel_apply_worker())
    4477               0 :             ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    4478 UIC           0 :         proc_exit(0);
    4479                 :     }
    4480                 : 
    4481 GIC         312 :     MySubscriptionValid = true;
    4482             312 :     MemoryContextSwitchTo(oldctx);
    4483 ECB             : 
    4484 CBC         312 :     if (!MySubscription->enabled)
    4485                 :     {
    4486 LBC           0 :         ereport(LOG,
    4487                 :         /* translator: first %s is the name of logical replication worker */
    4488                 :                 (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
    4489                 :                         get_worker_name(), MySubscription->name)));
    4490                 : 
    4491 UNC           0 :         apply_worker_exit();
    4492                 :     }
    4493                 : 
    4494 ECB             :     /* Setup synchronous commit according to the user's wishes */
    4495 GIC         312 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    4496 ECB             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    4497                 : 
    4498                 :     /* Keep us informed about subscription changes. */
    4499 GIC         312 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    4500 ECB             :                                   subscription_change_cb,
    4501                 :                                   (Datum) 0);
    4502                 : 
    4503 CBC         312 :     if (am_tablesync_worker())
    4504 GIC         156 :         ereport(LOG,
    4505                 :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    4506                 :                         MySubscription->name,
    4507                 :                         get_rel_name(MyLogicalRepWorker->relid))));
    4508                 :     else
    4509             156 :         ereport(LOG,
    4510                 :         /* translator: first %s is the name of logical replication worker */
    4511                 :                 (errmsg("%s for subscription \"%s\" has started",
    4512                 :                         get_worker_name(), MySubscription->name)));
    4513                 : 
    4514             312 :     CommitTransactionCommand();
    4515 GNC         312 : }
    4516                 : 
    4517                 : /* Logical Replication Apply worker entry point */
    4518                 : void
    4519             305 : ApplyWorkerMain(Datum main_arg)
    4520                 : {
    4521             305 :     int         worker_slot = DatumGetInt32(main_arg);
    4522                 :     char        originname[NAMEDATALEN];
    4523             305 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
    4524             305 :     char       *myslotname = NULL;
    4525                 :     WalRcvStreamOptions options;
    4526                 :     int         server_version;
    4527                 : 
    4528                 :     /* Attach to slot */
    4529             305 :     logicalrep_worker_attach(worker_slot);
    4530                 : 
    4531                 :     /* Setup signal handling */
    4532             305 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    4533             305 :     pqsignal(SIGTERM, die);
    4534             305 :     BackgroundWorkerUnblockSignals();
    4535                 : 
    4536                 :     /*
    4537                 :      * We don't currently need any ResourceOwner in a walreceiver process, but
    4538                 :      * if we did, we could call CreateAuxProcessResourceOwner here.
    4539                 :      */
    4540                 : 
    4541                 :     /* Initialise stats to a sanish value */
    4542             305 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    4543             305 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    4544                 : 
    4545                 :     /* Load the libpq-specific functions */
    4546             305 :     load_file("libpqwalreceiver", false);
    4547                 : 
    4548             305 :     InitializeApplyWorker();
    4549                 : 
    4550                 :     /* Connect to the origin and start the replication. */
    4551 GIC         302 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    4552 ECB             :          MySubscription->conninfo);
    4553                 : 
    4554 GIC         302 :     if (am_tablesync_worker())
    4555                 :     {
    4556             156 :         start_table_sync(&origin_startpos, &myslotname);
    4557                 : 
    4558 GNC         149 :         ReplicationOriginNameForLogicalRep(MySubscription->oid,
    4559             149 :                                            MyLogicalRepWorker->relid,
    4560                 :                                            originname,
    4561                 :                                            sizeof(originname));
    4562             149 :         set_apply_error_context_origin(originname);
    4563 EUB             :     }
    4564                 :     else
    4565                 :     {
    4566                 :         /* This is the leader apply worker */
    4567                 :         RepOriginId originid;
    4568 ECB             :         TimeLineID  startpointTLI;
    4569                 :         char       *err;
    4570                 :         bool        must_use_password;
    4571                 : 
    4572 GIC         146 :         myslotname = MySubscription->slotname;
    4573 ECB             : 
    4574                 :         /*
    4575 EUB             :          * This shouldn't happen if the subscription is enabled, but guard
    4576                 :          * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
    4577                 :          * crash if slot is NULL.)
    4578                 :          */
    4579 GIC         146 :         if (!myslotname)
    4580 UIC           0 :             ereport(ERROR,
    4581 ECB             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    4582                 :                      errmsg("subscription has no replication slot set")));
    4583                 : 
    4584                 :         /* Setup replication origin tracking. */
    4585 GIC         146 :         StartTransactionCommand();
    4586 GNC         146 :         ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
    4587                 :                                            originname, sizeof(originname));
    4588 GIC         146 :         originid = replorigin_by_name(originname, true);
    4589             146 :         if (!OidIsValid(originid))
    4590 UIC           0 :             originid = replorigin_create(originname);
    4591 GNC         146 :         replorigin_session_setup(originid, 0);
    4592 CBC         146 :         replorigin_session_origin = originid;
    4593             146 :         origin_startpos = replorigin_session_get_progress(false);
    4594                 : 
    4595                 :         /* Is the use of a password mandatory? */
    4596 GNC         278 :         must_use_password = MySubscription->passwordrequired &&
    4597             132 :             !superuser_arg(MySubscription->owner);
    4598                 : 
    4599                 :         /* Note that the superuser_arg call can access the DB */
    4600 GIC         146 :         CommitTransactionCommand();
    4601                 : 
    4602             146 :         LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
    4603                 :                                                 must_use_password,
    4604                 :                                                 MySubscription->name, &err);
    4605 CBC         145 :         if (LogRepWorkerWalRcvConn == NULL)
    4606              12 :             ereport(ERROR,
    4607                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    4608                 :                      errmsg("could not connect to the publisher: %s", err)));
    4609 ECB             : 
    4610 EUB             :         /*
    4611                 :          * We don't really use the output identify_system for anything but it
    4612                 :          * does some initializations on the upstream so let's still call it.
    4613                 :          */
    4614 GIC         133 :         (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
    4615                 : 
    4616 GNC         133 :         set_apply_error_context_origin(originname);
    4617                 :     }
    4618                 : 
    4619                 :     /*
    4620                 :      * Setup callback for syscache so that we know when something changes in
    4621                 :      * the subscription relation state.
    4622                 :      */
    4623 GIC         282 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    4624                 :                                   invalidate_syncing_table_states,
    4625                 :                                   (Datum) 0);
    4626                 : 
    4627                 :     /* Build logical replication streaming options. */
    4628             282 :     options.logical = true;
    4629             282 :     options.startpoint = origin_startpos;
    4630             282 :     options.slotname = myslotname;
    4631                 : 
    4632 CBC         282 :     server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
    4633             282 :     options.proto.logical.proto_version =
    4634 GNC         282 :         server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
    4635                 :         server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
    4636                 :         server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
    4637 ECB             :         LOGICALREP_PROTO_VERSION_NUM;
    4638 EUB             : 
    4639 GIC         282 :     options.proto.logical.publication_names = MySubscription->publications;
    4640 CBC         282 :     options.proto.logical.binary = MySubscription->binary;
    4641                 : 
    4642                 :     /*
    4643                 :      * Assign the appropriate option value for streaming option according to
    4644                 :      * the 'streaming' mode and the publisher's ability to support that mode.
    4645                 :      */
    4646 GNC         282 :     if (server_version >= 160000 &&
    4647             282 :         MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
    4648                 :     {
    4649               7 :         options.proto.logical.streaming_str = "parallel";
    4650               7 :         MyLogicalRepWorker->parallel_apply = true;
    4651                 :     }
    4652             275 :     else if (server_version >= 140000 &&
    4653             275 :              MySubscription->stream != LOGICALREP_STREAM_OFF)
    4654                 :     {
    4655              26 :         options.proto.logical.streaming_str = "on";
    4656              26 :         MyLogicalRepWorker->parallel_apply = false;
    4657                 :     }
    4658                 :     else
    4659                 :     {
    4660             249 :         options.proto.logical.streaming_str = NULL;
    4661             249 :         MyLogicalRepWorker->parallel_apply = false;
    4662                 :     }
    4663                 : 
    4664 GIC         282 :     options.proto.logical.twophase = false;
    4665 GNC         282 :     options.proto.logical.origin = pstrdup(MySubscription->origin);
    4666                 : 
    4667 GIC         282 :     if (!am_tablesync_worker())
    4668                 :     {
    4669 ECB             :         /*
    4670                 :          * Even when the two_phase mode is requested by the user, it remains
    4671                 :          * as the tri-state PENDING until all tablesyncs have reached READY
    4672                 :          * state. Only then, can it become ENABLED.
    4673 EUB             :          *
    4674                 :          * Note: If the subscription has no tables then leave the state as
    4675 ECB             :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
    4676                 :          * work.
    4677                 :          */
    4678 CBC         142 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
    4679 GIC           9 :             AllTablesyncsReady())
    4680                 :         {
    4681 ECB             :             /* Start streaming with two_phase enabled */
    4682 CBC           3 :             options.proto.logical.twophase = true;
    4683               3 :             walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    4684 ECB             : 
    4685 GIC           3 :             StartTransactionCommand();
    4686 CBC           3 :             UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
    4687               3 :             MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
    4688 GIC           3 :             CommitTransactionCommand();
    4689 ECB             :         }
    4690                 :         else
    4691                 :         {
    4692 GIC         130 :             walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    4693 ECB             :         }
    4694                 : 
    4695 GIC         133 :         ereport(DEBUG1,
    4696                 :                 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
    4697 ECB             :                         MySubscription->name,
    4698                 :                         MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
    4699                 :                         MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
    4700                 :                         MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
    4701                 :                         "?")));
    4702                 :     }
    4703                 :     else
    4704                 :     {
    4705                 :         /* Start normal logical streaming replication. */
    4706 CBC         149 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    4707                 :     }
    4708                 : 
    4709                 :     /* Run the main loop. */
    4710 GIC         282 :     start_apply(origin_startpos);
    4711                 : 
    4712 LBC           0 :     proc_exit(0);
    4713                 : }
    4714                 : 
    4715 ECB             : /*
    4716                 :  * After error recovery, disable the subscription in a new transaction
    4717                 :  * and exit cleanly.
    4718                 :  */
    4719                 : static void
    4720 CBC           4 : DisableSubscriptionAndExit(void)
    4721                 : {
    4722                 :     /*
    4723                 :      * Emit the error message, and recover from the error state to an idle
    4724                 :      * state
    4725                 :      */
    4726 GIC           4 :     HOLD_INTERRUPTS();
    4727 ECB             : 
    4728 GIC           4 :     EmitErrorReport();
    4729 CBC           4 :     AbortOutOfAnyTransaction();
    4730 GIC           4 :     FlushErrorState();
    4731                 : 
    4732               4 :     RESUME_INTERRUPTS();
    4733                 : 
    4734                 :     /* Report the worker failed during either table synchronization or apply */
    4735               4 :     pgstat_report_subscription_error(MyLogicalRepWorker->subid,
    4736               4 :                                      !am_tablesync_worker());
    4737                 : 
    4738                 :     /* Disable the subscription */
    4739 GBC           4 :     StartTransactionCommand();
    4740 GIC           4 :     DisableSubscription(MySubscription->oid);
    4741               4 :     CommitTransactionCommand();
    4742                 : 
    4743                 :     /* Ensure we remove no-longer-useful entry for worker's start time */
    4744 GNC           4 :     if (!am_tablesync_worker() && !am_parallel_apply_worker())
    4745               3 :         ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    4746                 : 
    4747                 :     /* Notify the subscription has been disabled and exit */
    4748 GIC           4 :     ereport(LOG,
    4749                 :             errmsg("subscription \"%s\" has been disabled because of an error",
    4750                 :                    MySubscription->name));
    4751                 : 
    4752 CBC           4 :     proc_exit(0);
    4753 ECB             : }
    4754                 : 
    4755                 : /*
    4756                 :  * Is current process a logical replication worker?
    4757                 :  */
    4758                 : bool
    4759 GIC        3003 : IsLogicalWorker(void)
    4760                 : {
    4761            3003 :     return MyLogicalRepWorker != NULL;
    4762 ECB             : }
    4763                 : 
    4764                 : /*
    4765                 :  * Is current process a logical replication parallel apply worker?
    4766                 :  */
    4767                 : bool
    4768 GNC        2606 : IsLogicalParallelApplyWorker(void)
    4769                 : {
    4770            2606 :     return IsLogicalWorker() && am_parallel_apply_worker();
    4771                 : }
    4772                 : 
    4773                 : /*
    4774                 :  * Start skipping changes of the transaction if the given LSN matches the
    4775 ECB             :  * LSN specified by subscription's skiplsn.
    4776                 :  */
    4777                 : static void
    4778 CBC         421 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
    4779 ECB             : {
    4780 GIC         421 :     Assert(!is_skipping_changes());
    4781             421 :     Assert(!in_remote_transaction);
    4782 CBC         421 :     Assert(!in_streamed_transaction);
    4783                 : 
    4784 ECB             :     /*
    4785                 :      * Quick return if it's not requested to skip this transaction. This
    4786                 :      * function is called for every remote transaction and we assume that
    4787                 :      * skipping the transaction is not used often.
    4788                 :      */
    4789 CBC         421 :     if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
    4790                 :                MySubscription->skiplsn != finish_lsn))
    4791             418 :         return;
    4792                 : 
    4793                 :     /* Start skipping all changes of this transaction */
    4794 GIC           3 :     skip_xact_finish_lsn = finish_lsn;
    4795                 : 
    4796               3 :     ereport(LOG,
    4797 ECB             :             errmsg("logical replication starts skipping transaction at LSN %X/%X",
    4798                 :                    LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
    4799 EUB             : }
    4800                 : 
    4801                 : /*
    4802                 :  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
    4803                 :  */
    4804                 : static void
    4805 GBC          26 : stop_skipping_changes(void)
    4806 EUB             : {
    4807 GBC          26 :     if (!is_skipping_changes())
    4808 GIC          23 :         return;
    4809                 : 
    4810               3 :     ereport(LOG,
    4811 ECB             :             (errmsg("logical replication completed skipping transaction at LSN %X/%X",
    4812                 :                     LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
    4813                 : 
    4814                 :     /* Stop skipping changes */
    4815 GIC           3 :     skip_xact_finish_lsn = InvalidXLogRecPtr;
    4816                 : }
    4817                 : 
    4818 ECB             : /*
    4819                 :  * Clear subskiplsn of pg_subscription catalog.
    4820                 :  *
    4821                 :  * finish_lsn is the transaction's finish LSN that is used to check if the
    4822                 :  * subskiplsn matches it. If not matched, we raise a warning when clearing the
    4823                 :  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
    4824                 :  * specified the wrong subskiplsn.
    4825                 :  */
    4826                 : static void
    4827 GIC         443 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
    4828                 : {
    4829                 :     Relation    rel;
    4830                 :     Form_pg_subscription subform;
    4831                 :     HeapTuple   tup;
    4832             443 :     XLogRecPtr  myskiplsn = MySubscription->skiplsn;
    4833             443 :     bool        started_tx = false;
    4834 ECB             : 
    4835 GNC         443 :     if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
    4836 CBC         440 :         return;
    4837 ECB             : 
    4838 CBC           3 :     if (!IsTransactionState())
    4839 ECB             :     {
    4840 CBC           1 :         StartTransactionCommand();
    4841               1 :         started_tx = true;
    4842                 :     }
    4843 ECB             : 
    4844 EUB             :     /*
    4845                 :      * Protect subskiplsn of pg_subscription from being concurrently updated
    4846                 :      * while clearing it.
    4847                 :      */
    4848 CBC           3 :     LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
    4849                 :                      AccessShareLock);
    4850                 : 
    4851 GIC           3 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    4852                 : 
    4853 ECB             :     /* Fetch the existing tuple. */
    4854 GIC           3 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
    4855                 :                               ObjectIdGetDatum(MySubscription->oid));
    4856                 : 
    4857 CBC           3 :     if (!HeapTupleIsValid(tup))
    4858 UIC           0 :         elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
    4859 EUB             : 
    4860 GIC           3 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
    4861                 : 
    4862                 :     /*
    4863                 :      * Clear the subskiplsn. If the user has already changed subskiplsn before
    4864 ECB             :      * clearing it we don't update the catalog and the replication origin
    4865                 :      * state won't get advanced. So in the worst case, if the server crashes
    4866                 :      * before sending an acknowledgment of the flush position the transaction
    4867                 :      * will be sent again and the user needs to set subskiplsn again. We can
    4868                 :      * reduce the possibility by logging a replication origin WAL record to
    4869                 :      * advance the origin LSN instead but there is no way to advance the
    4870                 :      * origin timestamp and it doesn't seem to be worth doing anything about
    4871                 :      * it since it's a very rare case.
    4872                 :      */
    4873 CBC           3 :     if (subform->subskiplsn == myskiplsn)
    4874 ECB             :     {
    4875                 :         bool        nulls[Natts_pg_subscription];
    4876                 :         bool        replaces[Natts_pg_subscription];
    4877                 :         Datum       values[Natts_pg_subscription];
    4878                 : 
    4879 GIC           3 :         memset(values, 0, sizeof(values));
    4880               3 :         memset(nulls, false, sizeof(nulls));
    4881               3 :         memset(replaces, false, sizeof(replaces));
    4882                 : 
    4883 ECB             :         /* reset subskiplsn */
    4884 GIC           3 :         values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
    4885 CBC           3 :         replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    4886 ECB             : 
    4887 GIC           3 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    4888                 :                                 replaces);
    4889               3 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    4890                 : 
    4891               3 :         if (myskiplsn != finish_lsn)
    4892 UIC           0 :             ereport(WARNING,
    4893                 :                     errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
    4894                 :                     errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
    4895                 :                               LSN_FORMAT_ARGS(finish_lsn),
    4896                 :                               LSN_FORMAT_ARGS(myskiplsn)));
    4897                 :     }
    4898 ECB             : 
    4899 GIC           3 :     heap_freetuple(tup);
    4900               3 :     table_close(rel, NoLock);
    4901                 : 
    4902               3 :     if (started_tx)
    4903               1 :         CommitTransactionCommand();
    4904 ECB             : }
    4905                 : 
    4906                 : /* Error callback to give more context info about the change being applied */
    4907                 : void
    4908 GIC         599 : apply_error_callback(void *arg)
    4909                 : {
    4910 CBC         599 :     ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
    4911                 : 
    4912             599 :     if (apply_error_callback_arg.command == 0)
    4913             261 :         return;
    4914                 : 
    4915             338 :     Assert(errarg->origin_name);
    4916                 : 
    4917 GIC         338 :     if (errarg->rel == NULL)
    4918                 :     {
    4919             309 :         if (!TransactionIdIsValid(errarg->remote_xid))
    4920 UIC           0 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
    4921                 :                        errarg->origin_name,
    4922 ECB             :                        logicalrep_message_type(errarg->command));
    4923 GIC         309 :         else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    4924 CBC         257 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
    4925 ECB             :                        errarg->origin_name,
    4926                 :                        logicalrep_message_type(errarg->command),
    4927                 :                        errarg->remote_xid);
    4928                 :         else
    4929 GIC         104 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
    4930 ECB             :                        errarg->origin_name,
    4931                 :                        logicalrep_message_type(errarg->command),
    4932                 :                        errarg->remote_xid,
    4933 CBC          52 :                        LSN_FORMAT_ARGS(errarg->finish_lsn));
    4934                 :     }
    4935                 :     else
    4936                 :     {
    4937 GNC          29 :         if (errarg->remote_attnum < 0)
    4938                 :         {
    4939              29 :             if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    4940               2 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
    4941                 :                            errarg->origin_name,
    4942                 :                            logicalrep_message_type(errarg->command),
    4943               1 :                            errarg->rel->remoterel.nspname,
    4944               1 :                            errarg->rel->remoterel.relname,
    4945                 :                            errarg->remote_xid);
    4946                 :             else
    4947              56 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
    4948                 :                            errarg->origin_name,
    4949                 :                            logicalrep_message_type(errarg->command),
    4950              28 :                            errarg->rel->remoterel.nspname,
    4951              28 :                            errarg->rel->remoterel.relname,
    4952                 :                            errarg->remote_xid,
    4953              28 :                            LSN_FORMAT_ARGS(errarg->finish_lsn));
    4954                 :         }
    4955                 :         else
    4956                 :         {
    4957 UNC           0 :             if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    4958               0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
    4959                 :                            errarg->origin_name,
    4960                 :                            logicalrep_message_type(errarg->command),
    4961               0 :                            errarg->rel->remoterel.nspname,
    4962               0 :                            errarg->rel->remoterel.relname,
    4963               0 :                            errarg->rel->remoterel.attnames[errarg->remote_attnum],
    4964                 :                            errarg->remote_xid);
    4965                 :             else
    4966               0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
    4967                 :                            errarg->origin_name,
    4968                 :                            logicalrep_message_type(errarg->command),
    4969               0 :                            errarg->rel->remoterel.nspname,
    4970               0 :                            errarg->rel->remoterel.relname,
    4971               0 :                            errarg->rel->remoterel.attnames[errarg->remote_attnum],
    4972                 :                            errarg->remote_xid,
    4973               0 :                            LSN_FORMAT_ARGS(errarg->finish_lsn));
    4974                 :         }
    4975                 :     }
    4976                 : }
    4977                 : 
    4978 ECB             : /* Set transaction information of apply error callback */
    4979                 : static inline void
    4980 CBC        2695 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
    4981                 : {
    4982 GIC        2695 :     apply_error_callback_arg.remote_xid = xid;
    4983            2695 :     apply_error_callback_arg.finish_lsn = lsn;
    4984            2695 : }
    4985                 : 
    4986 ECB             : /* Reset all information of apply error callback */
    4987                 : static inline void
    4988 GIC        1332 : reset_apply_error_context_info(void)
    4989 ECB             : {
    4990 CBC        1332 :     apply_error_callback_arg.command = 0;
    4991 GIC        1332 :     apply_error_callback_arg.rel = NULL;
    4992            1332 :     apply_error_callback_arg.remote_attnum = -1;
    4993 CBC        1332 :     set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
    4994 GIC        1332 : }
    4995                 : 
    4996                 : /*
    4997                 :  * Request wakeup of the workers for the given subscription OID
    4998                 :  * at commit of the current transaction.
    4999                 :  *
    5000                 :  * This is used to ensure that the workers process assorted changes
    5001                 :  * as soon as possible.
    5002                 :  */
    5003                 : void
    5004 GNC         160 : LogicalRepWorkersWakeupAtCommit(Oid subid)
    5005                 : {
    5006                 :     MemoryContext oldcxt;
    5007                 : 
    5008             160 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    5009             160 :     on_commit_wakeup_workers_subids =
    5010             160 :         list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
    5011             160 :     MemoryContextSwitchTo(oldcxt);
    5012             160 : }
    5013                 : 
    5014                 : /*
    5015                 :  * Wake up the workers of any subscriptions that were changed in this xact.
    5016                 :  */
    5017                 : void
    5018          485916 : AtEOXact_LogicalRepWorkers(bool isCommit)
    5019                 : {
    5020          485916 :     if (isCommit && on_commit_wakeup_workers_subids != NIL)
    5021                 :     {
    5022                 :         ListCell   *lc;
    5023                 : 
    5024             156 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    5025             312 :         foreach(lc, on_commit_wakeup_workers_subids)
    5026                 :         {
    5027             156 :             Oid         subid = lfirst_oid(lc);
    5028                 :             List       *workers;
    5029                 :             ListCell   *lc2;
    5030                 : 
    5031             156 :             workers = logicalrep_workers_find(subid, true);
    5032             207 :             foreach(lc2, workers)
    5033                 :             {
    5034              51 :                 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
    5035                 : 
    5036              51 :                 logicalrep_worker_wakeup_ptr(worker);
    5037                 :             }
    5038                 :         }
    5039             156 :         LWLockRelease(LogicalRepWorkerLock);
    5040                 :     }
    5041                 : 
    5042                 :     /* The List storage will be reclaimed automatically in xact cleanup. */
    5043          485916 :     on_commit_wakeup_workers_subids = NIL;
    5044          485916 : }
    5045                 : 
    5046                 : /*
    5047                 :  * Allocate the origin name in long-lived context for error context message.
    5048                 :  */
    5049                 : void
    5050             292 : set_apply_error_context_origin(char *originname)
    5051                 : {
    5052             292 :     apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
    5053                 :                                                                originname);
    5054             292 : }
    5055                 : 
    5056                 : /*
    5057                 :  * Return the action to be taken for the given transaction. See
    5058                 :  * TransApplyAction for information on each of the actions.
    5059                 :  *
    5060                 :  * *winfo is assigned to the destination parallel worker info when the leader
    5061                 :  * apply worker has to pass all the transaction's changes to the parallel
    5062                 :  * apply worker.
    5063                 :  */
    5064                 : static TransApplyAction
    5065          325903 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
    5066                 : {
    5067          325903 :     *winfo = NULL;
    5068                 : 
    5069          325903 :     if (am_parallel_apply_worker())
    5070                 :     {
    5071           68910 :         return TRANS_PARALLEL_APPLY;
    5072                 :     }
    5073                 : 
    5074                 :     /*
    5075                 :      * If we are processing this transaction using a parallel apply worker then
    5076                 :      * either we send the changes to the parallel worker or if the worker is busy
    5077                 :      * then serialize the changes to the file which will later be processed by
    5078                 :      * the parallel worker.
    5079                 :      */
    5080          256993 :     *winfo = pa_find_worker(xid);
    5081                 : 
    5082          256993 :     if (*winfo && (*winfo)->serialize_changes)
    5083                 :     {
    5084            5037 :         return TRANS_LEADER_PARTIAL_SERIALIZE;
    5085                 :     }
    5086          251956 :     else if (*winfo)
    5087                 :     {
    5088           68894 :         return TRANS_LEADER_SEND_TO_PARALLEL;
    5089                 :     }
    5090                 : 
    5091                 :     /*
    5092                 :      * If there is no parallel worker involved to process this transaction then
    5093                 :      * we either directly apply the change or serialize it to a file which will
    5094                 :      * later be applied when the transaction finish message is processed.
    5095                 :      */
    5096          183062 :     else if (in_streamed_transaction)
    5097                 :     {
    5098          103195 :         return TRANS_LEADER_SERIALIZE;
    5099                 :     }
    5100                 :     else
    5101                 :     {
    5102           79867 :         return TRANS_LEADER_APPLY;
    5103                 :     }
    5104                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a