LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit LBC UIC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 93.0 % 487 453 10 24 8 274 43 128 26 297 18
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 16 16 16 15 1
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 9 9 9
Legend: Lines: hit not hit (60,120] days: 100.0 % 13 13 13
(120,180] days: 100.0 % 5 5 5
(180,240] days: 100.0 % 23 23 7 15 1 4
(240..) days: 92.2 % 437 403 10 24 8 267 1 127 25 268
Function coverage date bins:
(240..) days: 55.2 % 29 16 16 13

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  * tablesync.c
                                  3                 :  *    PostgreSQL logical replication: initial table data synchronization
                                  4                 :  *
                                  5                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
                                  6                 :  *
                                  7                 :  * IDENTIFICATION
                                  8                 :  *    src/backend/replication/logical/tablesync.c
                                  9                 :  *
                                 10                 :  * NOTES
                                 11                 :  *    This file contains code for initial table data synchronization for
                                 12                 :  *    logical replication.
                                 13                 :  *
                                 14                 :  *    The initial data synchronization is done separately for each table,
                                 15                 :  *    in a separate apply worker that only fetches the initial snapshot data
                                 16                 :  *    from the publisher and then synchronizes the position in the stream with
                                 17                 :  *    the leader apply worker.
                                 18                 :  *
                                 19                 :  *    There are several reasons for doing the synchronization this way:
                                 20                 :  *     - It allows us to parallelize the initial data synchronization
                                 21                 :  *       which lowers the time needed for it to happen.
                                 22                 :  *     - The initial synchronization does not have to hold the xid and LSN
                                 23                 :  *       for the time it takes to copy data of all tables, causing less
                                 24                 :  *       bloat and lower disk consumption compared to doing the
                                 25                 :  *       synchronization in a single process for the whole database.
                                 26                 :  *     - It allows us to synchronize any tables added after the initial
                                 27                 :  *       synchronization has finished.
                                 28                 :  *
                                 29                 :  *    The stream position synchronization works in multiple steps:
                                 30                 :  *     - Apply worker requests a tablesync worker to start, setting the new
                                 31                 :  *       table state to INIT.
                                 32                 :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
                                 33                 :  *       copying.
                                 34                 :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
                                 35                 :  *       worker specific) state to indicate when the copy phase has completed, so
                                 36                 :  *       if the worker crashes with this (non-memory) state then the copy will not
                                 37                 :  *       be re-attempted.
                                 38                 :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
                                 39                 :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
                                 40                 :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
                                 41                 :  *       until either the table state is set to SYNCDONE or the sync worker
                                 42                 :  *       exits.
                                 43                 :  *     - After the sync worker has seen the state change to CATCHUP, it will
                                 44                 :  *       read the stream and apply changes (acting like an apply worker) until
                                 45                 :  *       it catches up to the specified stream position.  Then it sets the
                                 46                 :  *       state to SYNCDONE.  There might be zero changes applied between
                                 47                 :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
                                 48                 :  *       apply worker.
                                 49                 :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
                                 50                 :  *       the table until it reaches the SYNCDONE stream position, at which
                                 51                 :  *       point it sets state to READY and stops tracking.  Again, there might
                                 52                 :  *       be zero changes in between.
                                 53                 :  *
                                 54                 :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
                                 55                 :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
                                 56                 :  *
                                 57                 :  *    The catalog pg_subscription_rel is used to keep information about
                                 58                 :  *    subscribed tables and their state.  The catalog holds all states
                                 59                 :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
                                 60                 :  *
                                 61                 :  *    Example flows look like this:
                                 62                 :  *     - Apply is in front:
                                 63                 :  *        sync:8
                                 64                 :  *          -> set in catalog FINISHEDCOPY
                                 65                 :  *          -> set in memory SYNCWAIT
                                 66                 :  *        apply:10
                                 67                 :  *          -> set in memory CATCHUP
                                 68                 :  *          -> enter wait-loop
                                 69                 :  *        sync:10
                                 70                 :  *          -> set in catalog SYNCDONE
                                 71                 :  *          -> exit
                                 72                 :  *        apply:10
                                 73                 :  *          -> exit wait-loop
                                 74                 :  *          -> continue rep
                                 75                 :  *        apply:11
                                 76                 :  *          -> set in catalog READY
                                 77                 :  *
                                 78                 :  *     - Sync is in front:
                                 79                 :  *        sync:10
                                 80                 :  *          -> set in catalog FINISHEDCOPY
                                 81                 :  *          -> set in memory SYNCWAIT
                                 82                 :  *        apply:8
                                 83                 :  *          -> set in memory CATCHUP
                                 84                 :  *          -> continue per-table filtering
                                 85                 :  *        sync:10
                                 86                 :  *          -> set in catalog SYNCDONE
                                 87                 :  *          -> exit
                                 88                 :  *        apply:10
                                 89                 :  *          -> set in catalog READY
                                 90                 :  *          -> stop per-table filtering
                                 91                 :  *          -> continue rep
                                 92                 :  *-------------------------------------------------------------------------
                                 93                 :  */
                                 94                 : 
                                 95                 : #include "postgres.h"
                                 96                 : 
                                 97                 : #include "access/table.h"
                                 98                 : #include "access/xact.h"
                                 99                 : #include "catalog/indexing.h"
                                100                 : #include "catalog/pg_subscription_rel.h"
                                101                 : #include "catalog/pg_type.h"
                                102                 : #include "commands/copy.h"
                                103                 : #include "miscadmin.h"
                                104                 : #include "nodes/makefuncs.h"
                                105                 : #include "parser/parse_relation.h"
                                106                 : #include "pgstat.h"
                                107                 : #include "replication/logicallauncher.h"
                                108                 : #include "replication/logicalrelation.h"
                                109                 : #include "replication/walreceiver.h"
                                110                 : #include "replication/worker_internal.h"
                                111                 : #include "replication/slot.h"
                                112                 : #include "replication/origin.h"
                                113                 : #include "storage/ipc.h"
                                114                 : #include "storage/lmgr.h"
                                115                 : #include "utils/acl.h"
                                116                 : #include "utils/array.h"
                                117                 : #include "utils/builtins.h"
                                118                 : #include "utils/lsyscache.h"
                                119                 : #include "utils/memutils.h"
                                120                 : #include "utils/rls.h"
                                121                 : #include "utils/snapmgr.h"
                                122                 : #include "utils/syscache.h"
                                123                 : 
                                124                 : static bool table_states_valid = false;
                                125                 : static List *table_states_not_ready = NIL;
                                126                 : static bool FetchTableStates(bool *started_tx);
                                127                 : 
                                128                 : static StringInfo copybuf = NULL;
                                129                 : 
                                130                 : /*
                                131                 :  * Exit routine for synchronization worker.
                                132                 :  */
                                133                 : static void
                                134                 : pg_attribute_noreturn()
 2208 peter_e                   135 GIC         148 : finish_sync_worker(void)
 2208 peter_e                   136 ECB             : {
                                137                 :     /*
                                138                 :      * Commit any outstanding transaction. This is the usual case, unless
                                139                 :      * there was nothing to do for the table.
                                140                 :      */
 2208 peter_e                   141 GIC         148 :     if (IsTransactionState())
 2186 peter_e                   142 ECB             :     {
 2208 peter_e                   143 GIC         148 :         CommitTransactionCommand();
  368 andres                    144 CBC         148 :         pgstat_report_stat(true);
 2186 peter_e                   145 ECB             :     }
                                146                 : 
                                147                 :     /* And flush all writes. */
 2208 peter_e                   148 GIC         148 :     XLogFlush(GetXLogWriteRecPtr());
 2208 peter_e                   149 ECB             : 
 2146 peter_e                   150 GIC         148 :     StartTransactionCommand();
 2208 peter_e                   151 CBC         148 :     ereport(LOG,
 2146 peter_e                   152 ECB             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
                                153                 :                     MySubscription->name,
                                154                 :                     get_rel_name(MyLogicalRepWorker->relid))));
 2146 peter_e                   155 GIC         148 :     CommitTransactionCommand();
 2208 peter_e                   156 ECB             : 
                                157                 :     /* Find the leader apply worker and signal it. */
 2133 peter_e                   158 GIC         148 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 2133 peter_e                   159 ECB             : 
                                160                 :     /* Stop gracefully */
 2208 peter_e                   161 GIC         148 :     proc_exit(0);
 2208 peter_e                   162 ECB             : }
                                163                 : 
                                164                 : /*
                                165                 :  * Wait until the relation sync state is set in the catalog to the expected
                                166                 :  * one; return true when it happens.
                                167                 :  *
                                168                 :  * Returns false if the table sync worker or the table itself have
                                169                 :  * disappeared, or the table state has been reset.
                                170                 :  *
                                171                 :  * Currently, this is used in the apply worker when transitioning from
                                172                 :  * CATCHUP state to SYNCDONE.
                                173                 :  */
                                174                 : static bool
 2133 peter_e                   175 GIC         145 : wait_for_relation_state_change(Oid relid, char expected_state)
 2208 peter_e                   176 ECB             : {
                                177                 :     char        state;
                                178                 : 
                                179                 :     for (;;)
 2208 peter_e                   180 GIC         147 :     {
 2153 bruce                     181 ECB             :         LogicalRepWorker *worker;
                                182                 :         XLogRecPtr  statelsn;
                                183                 : 
 2137 peter_e                   184 GIC         292 :         CHECK_FOR_INTERRUPTS();
 2137 peter_e                   185 ECB             : 
  906 alvherre                  186 GIC         292 :         InvalidateCatalogSnapshot();
 2133 peter_e                   187 CBC         292 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
  906 alvherre                  188 ECB             :                                         relid, &statelsn);
                                189                 : 
 2133 peter_e                   190 GIC         292 :         if (state == SUBREL_STATE_UNKNOWN)
  906 alvherre                  191 LBC           0 :             break;
 2133 peter_e                   192 EUB             : 
 2133 peter_e                   193 GIC         292 :         if (state == expected_state)
 2133 peter_e                   194 CBC         145 :             return true;
 2133 peter_e                   195 ECB             : 
                                196                 :         /* Check if the sync worker is still running and bail if not. */
 2208 peter_e                   197 GIC         147 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
  906 alvherre                  198 CBC         147 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
 2136 peter_e                   199 ECB             :                                         false);
 2133 peter_e                   200 GIC         147 :         LWLockRelease(LogicalRepWorkerLock);
 2208 peter_e                   201 CBC         147 :         if (!worker)
  906 alvherre                  202 LBC           0 :             break;
 2136 peter_e                   203 EUB             : 
 1598 tmunro                    204 GIC         147 :         (void) WaitLatch(MyLatch,
 1598 tmunro                    205 ECB             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                206                 :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                207                 : 
 2133 andres                    208 GIC         147 :         ResetLatch(MyLatch);
 2133 peter_e                   209 ECB             :     }
                                210                 : 
 2133 peter_e                   211 UIC           0 :     return false;
 2133 peter_e                   212 EUB             : }
                                213                 : 
                                214                 : /*
                                215                 :  * Wait until the apply worker changes the state of our synchronization
                                216                 :  * worker to the expected one.
                                217                 :  *
                                218                 :  * Used when transitioning from SYNCWAIT state to CATCHUP.
                                219                 :  *
                                220                 :  * Returns false if the apply worker has disappeared.
                                221                 :  */
                                222                 : static bool
 2133 peter_e                   223 GIC         149 : wait_for_worker_state_change(char expected_state)
 2133 peter_e                   224 ECB             : {
                                225                 :     int         rc;
                                226                 : 
                                227                 :     for (;;)
 2133 peter_e                   228 GIC         149 :     {
 2133 peter_e                   229 ECB             :         LogicalRepWorker *worker;
                                230                 : 
 2133 peter_e                   231 GIC         298 :         CHECK_FOR_INTERRUPTS();
 2133 peter_e                   232 ECB             : 
                                233                 :         /*
                                234                 :          * Done if already in correct state.  (We assume this fetch is atomic
                                235                 :          * enough to not give a misleading answer if we do it with no lock.)
                                236                 :          */
 2109 tgl                       237 GIC         298 :         if (MyLogicalRepWorker->relstate == expected_state)
 2109 tgl                       238 CBC         149 :             return true;
 2109 tgl                       239 ECB             : 
                                240                 :         /*
                                241                 :          * Bail out if the apply worker has died, else signal it we're
                                242                 :          * waiting.
                                243                 :          */
 2133 peter_e                   244 GIC         149 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 2133 peter_e                   245 CBC         149 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
 2133 peter_e                   246 ECB             :                                         InvalidOid, false);
 2109 tgl                       247 GIC         149 :         if (worker && worker->proc)
 2109 tgl                       248 CBC         149 :             logicalrep_worker_wakeup_ptr(worker);
 2133 peter_e                   249             149 :         LWLockRelease(LogicalRepWorkerLock);
                                250             149 :         if (!worker)
 2109 tgl                       251 LBC           0 :             break;
 2208 peter_e                   252 EUB             : 
                                253                 :         /*
                                254                 :          * Wait.  We expect to get a latch signal back from the apply worker,
                                255                 :          * but use a timeout in case it dies without sending one.
                                256                 :          */
 2133 andres                    257 GIC         149 :         rc = WaitLatch(MyLatch,
 1598 tmunro                    258 ECB             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                259                 :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                260                 : 
 2109 tgl                       261 GIC         149 :         if (rc & WL_LATCH_SET)
 2109 tgl                       262 CBC         149 :             ResetLatch(MyLatch);
 2208 peter_e                   263 ECB             :     }
                                264                 : 
 2208 peter_e                   265 UIC           0 :     return false;
 2208 peter_e                   266 EUB             : }
                                267                 : 
                                268                 : /*
                                269                 :  * Callback from syscache invalidation.
                                270                 :  */
                                271                 : void
 2208 peter_e                   272 GIC        1038 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 2208 peter_e                   273 ECB             : {
 2208 peter_e                   274 GIC        1038 :     table_states_valid = false;
 2208 peter_e                   275 CBC        1038 : }
 2208 peter_e                   276 ECB             : 
                                277                 : /*
                                278                 :  * Handle table synchronization cooperation from the synchronization
                                279                 :  * worker.
                                280                 :  *
                                281                 :  * If the sync worker is in CATCHUP state and reached (or passed) the
                                282                 :  * predetermined synchronization point in the WAL stream, mark the table as
                                283                 :  * SYNCDONE and finish.
                                284                 :  */
                                285                 : static void
 2208 peter_e                   286 GIC         170 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 2208 peter_e                   287 ECB             : {
 2208 peter_e                   288 GIC         170 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 2208 peter_e                   289 ECB             : 
 2208 peter_e                   290 GIC         170 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
 2208 peter_e                   291 CBC         170 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
 2208 peter_e                   292 ECB             :     {
                                293                 :         TimeLineID  tli;
  786 akapila                   294 GIC         149 :         char        syncslotname[NAMEDATALEN] = {0};
  222 akapila                   295 GNC         149 :         char        originname[NAMEDATALEN] = {0};
 2208 peter_e                   296 ECB             : 
 2133 peter_e                   297 CBC         149 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 2208 peter_e                   298 GIC         149 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
 2208 peter_e                   299 ECB             : 
 2208 peter_e                   300 CBC         149 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                301                 : 
  786 akapila                   302 ECB             :         /*
                                303                 :          * UpdateSubscriptionRelState must be called within a transaction.
                                304                 :          */
  786 akapila                   305 GIC         149 :         if (!IsTransactionState())
  786 akapila                   306 CBC         149 :             StartTransactionCommand();
  786 akapila                   307 ECB             : 
 1829 peter_e                   308 GIC         149 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 1829 peter_e                   309 CBC         149 :                                    MyLogicalRepWorker->relid,
                                310             149 :                                    MyLogicalRepWorker->relstate,
                                311             149 :                                    MyLogicalRepWorker->relstate_lsn);
 2208 peter_e                   312 ECB             : 
                                313                 :         /*
                                314                 :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
                                315                 :          * the slot.
                                316                 :          */
  697 alvherre                  317 GIC         149 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
  786 akapila                   318 ECB             : 
                                319                 :         /*
                                320                 :          * Cleanup the tablesync slot.
                                321                 :          *
                                322                 :          * This has to be done after updating the state because otherwise if
                                323                 :          * there is an error while doing the database operations we won't be
                                324                 :          * able to rollback dropped slot.
                                325                 :          */
  786 akapila                   326 GIC         148 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
  786 akapila                   327 CBC         148 :                                         MyLogicalRepWorker->relid,
  783 akapila                   328 ECB             :                                         syncslotname,
                                329                 :                                         sizeof(syncslotname));
                                330                 : 
                                331                 :         /*
                                332                 :          * It is important to give an error if we are unable to drop the slot,
                                333                 :          * otherwise, it won't be dropped till the corresponding subscription
                                334                 :          * is dropped. So passing missing_ok = false.
                                335                 :          */
  697 alvherre                  336 GIC         148 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
  786 akapila                   337 ECB             : 
  209 akapila                   338 GNC         148 :         CommitTransactionCommand();
                                339             148 :         pgstat_report_stat(false);
                                340                 : 
                                341                 :         /*
                                342                 :          * Start a new transaction to clean up the tablesync origin tracking.
                                343                 :          * This transaction will be ended within the finish_sync_worker().
                                344                 :          * Now, even, if we fail to remove this here, the apply worker will
                                345                 :          * ensure to clean it up afterward.
                                346                 :          *
                                347                 :          * We need to do this after the table state is set to SYNCDONE.
                                348                 :          * Otherwise, if an error occurs while performing the database
                                349                 :          * operation, the worker will be restarted and the in-memory state of
                                350                 :          * replication progress (remote_lsn) won't be rolled-back which would
                                351                 :          * have been cleared before restart. So, the restarted worker will use
                                352                 :          * invalid replication progress state resulting in replay of
                                353                 :          * transactions that have already been applied.
                                354                 :          */
                                355             148 :         StartTransactionCommand();
                                356                 : 
  180                           357             148 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                358             148 :                                            MyLogicalRepWorker->relid,
                                359                 :                                            originname,
                                360                 :                                            sizeof(originname));
                                361                 : 
                                362                 :         /*
                                363                 :          * Resetting the origin session removes the ownership of the slot.
                                364                 :          * This is needed to allow the origin to be dropped.
                                365                 :          */
  209                           366             148 :         replorigin_session_reset();
                                367             148 :         replorigin_session_origin = InvalidRepOriginId;
                                368             148 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
                                369             148 :         replorigin_session_origin_timestamp = 0;
                                370                 : 
                                371                 :         /*
                                372                 :          * Drop the tablesync's origin tracking if exists.
                                373                 :          *
                                374                 :          * There is a chance that the user is concurrently performing refresh
                                375                 :          * for the subscription where we remove the table state and its origin
                                376                 :          * or the apply worker would have removed this origin. So passing
                                377                 :          * missing_ok = true.
                                378                 :          */
                                379             148 :         replorigin_drop_by_name(originname, true, false);
                                380                 : 
 2208 peter_e                   381 GIC         148 :         finish_sync_worker();
 2208 peter_e                   382 ECB             :     }
                                383                 :     else
 2208 peter_e                   384 GIC          21 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                385              21 : }
                                386                 : 
                                387                 : /*
                                388                 :  * Handle table synchronization cooperation from the apply worker.
                                389                 :  *
                                390                 :  * Walk over all subscription tables that are individually tracked by the
                                391                 :  * apply process (currently, all that have state other than
                                392                 :  * SUBREL_STATE_READY) and manage synchronization for them.
                                393                 :  *
                                394                 :  * If there are tables that need synchronizing and are not being synchronized
                                395                 :  * yet, start sync workers for them (if there are free slots for sync
                                396                 :  * workers).  To prevent starting the sync worker for the same relation at a
                                397                 :  * high frequency after a failure, we store its last start time with each sync
                                398                 :  * state info.  We start the sync worker for the same relation after waiting
 2173 peter_e                   399 ECB             :  * at least wal_retrieve_retry_interval.
                                400                 :  *
 2208                           401                 :  * For tables that are being synchronized already, check if sync workers
 2133                           402                 :  * either need action from the apply worker or have finished.  This is the
                                403                 :  * SYNCWAIT to CATCHUP transition.
                                404                 :  *
                                405                 :  * If the synchronization position is reached (SYNCDONE), then the table can
                                406                 :  * be marked as READY and is no longer tracked.
                                407                 :  */
                                408                 : static void
 2208 peter_e                   409 GIC        2476 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 2208 peter_e                   410 ECB             : {
 2173                           411                 :     struct tablesync_start_time_mapping
                                412                 :     {
                                413                 :         Oid         relid;
                                414                 :         TimestampTz last_start_time;
                                415                 :     };
                                416                 :     static HTAB *last_start_times = NULL;
                                417                 :     ListCell   *lc;
 2162 peter_e                   418 GIC        2476 :     bool        started_tx = false;
   93 tgl                       419 GNC        2476 :     bool        should_exit = false;
                                420                 : 
 2208 peter_e                   421 GIC        2476 :     Assert(!IsTransactionState());
                                422                 : 
                                423                 :     /* We need up-to-date sync state info for subscription tables here. */
  634 akapila                   424 CBC        2476 :     FetchTableStates(&started_tx);
                                425                 : 
 2173 peter_e                   426 ECB             :     /*
                                427                 :      * Prepare a hash table for tracking last start times of workers, to avoid
                                428                 :      * immediate restarts.  We don't need it if there are no tables that need
                                429                 :      * syncing.
                                430                 :      */
  235 tgl                       431 GNC        2476 :     if (table_states_not_ready != NIL && !last_start_times)
 2173 peter_e                   432 GIC          88 :     {
                                433                 :         HASHCTL     ctl;
                                434                 : 
                                435              88 :         ctl.keysize = sizeof(Oid);
                                436              88 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
                                437              88 :         last_start_times = hash_create("Logical replication table sync worker start times",
                                438                 :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
                                439                 :     }
                                440                 : 
                                441                 :     /*
                                442                 :      * Clean up the hash table when we're done with all tables (just to
                                443                 :      * release the bit of memory).
                                444                 :      */
  235 tgl                       445 GNC        2388 :     else if (table_states_not_ready == NIL && last_start_times)
                                446                 :     {
 2173 peter_e                   447 GIC          68 :         hash_destroy(last_start_times);
                                448              68 :         last_start_times = NULL;
                                449                 :     }
                                450                 : 
                                451                 :     /*
                                452                 :      * Process all tables that are being synchronized.
                                453                 :      */
  634 akapila                   454 CBC        3865 :     foreach(lc, table_states_not_ready)
 2208 peter_e                   455 ECB             :     {
 2153 bruce                     456 GIC        1389 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
                                457                 : 
 2208 peter_e                   458 CBC        1389 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
 2208 peter_e                   459 ECB             :         {
                                460                 :             /*
                                461                 :              * Apply has caught up to the position where the table sync has
                                462                 :              * finished.  Mark the table as ready so that the apply will just
                                463                 :              * continue to replicate it normally.
                                464                 :              */
 2208 peter_e                   465 GIC         144 :             if (current_lsn >= rstate->lsn)
                                466                 :             {
                                467                 :                 char        originname[NAMEDATALEN];
  209 akapila                   468 ECB             : 
 2208 peter_e                   469 GIC         144 :                 rstate->state = SUBREL_STATE_READY;
 2208 peter_e                   470 CBC         144 :                 rstate->lsn = current_lsn;
 2162                           471             144 :                 if (!started_tx)
                                472                 :                 {
 2162 peter_e                   473 GIC           2 :                     StartTransactionCommand();
                                474               2 :                     started_tx = true;
                                475                 :                 }
                                476                 : 
  786 akapila                   477 ECB             :                 /*
                                478                 :                  * Remove the tablesync origin tracking if exists.
  209                           479                 :                  *
                                480                 :                  * There is a chance that the user is concurrently performing
                                481                 :                  * refresh for the subscription where we remove the table
                                482                 :                  * state and its origin or the tablesync worker would have
                                483                 :                  * already removed this origin. We can't rely on tablesync
                                484                 :                  * worker to remove the origin tracking as if there is any
                                485                 :                  * error while dropping we won't restart it to drop the
                                486                 :                  * origin. So passing missing_ok = true.
                                487                 :                  */
  180 akapila                   488 GNC         144 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                489                 :                                                    rstate->relid,
                                490                 :                                                    originname,
                                491                 :                                                    sizeof(originname));
  209 akapila                   492 CBC         144 :                 replorigin_drop_by_name(originname, true, false);
                                493                 : 
  209 akapila                   494 ECB             :                 /*
                                495                 :                  * Update the state to READY only after the origin cleanup.
                                496                 :                  */
 1829 peter_e                   497 GIC         144 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                498             144 :                                            rstate->relid, rstate->state,
                                499                 :                                            rstate->lsn);
                                500                 :             }
                                501                 :         }
                                502                 :         else
                                503                 :         {
                                504                 :             LogicalRepWorker *syncworker;
                                505                 : 
                                506                 :             /*
                                507                 :              * Look for a sync worker for this relation.
                                508                 :              */
 2208 peter_e                   509 CBC        1245 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                510                 : 
 2208 peter_e                   511 GIC        1245 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                512                 :                                                 rstate->relid, false);
 2109 tgl                       513 ECB             : 
 2208 peter_e                   514 GIC        1245 :             if (syncworker)
                                515                 :             {
                                516                 :                 /* Found one, update our copy of its state */
                                517             542 :                 SpinLockAcquire(&syncworker->relmutex);
 2208 peter_e                   518 CBC         542 :                 rstate->state = syncworker->relstate;
                                519             542 :                 rstate->lsn = syncworker->relstate_lsn;
 2109 tgl                       520 GIC         542 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                521                 :                 {
                                522                 :                     /*
                                523                 :                      * Sync worker is waiting for apply.  Tell sync worker it
                                524                 :                      * can catchup now.
                                525                 :                      */
                                526             145 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
                                527             145 :                     syncworker->relstate_lsn =
                                528             145 :                         Max(syncworker->relstate_lsn, current_lsn);
                                529                 :                 }
 2208 peter_e                   530 CBC         542 :                 SpinLockRelease(&syncworker->relmutex);
                                531                 : 
 2109 tgl                       532 ECB             :                 /* If we told worker to catch up, wait for it. */
 2109 tgl                       533 GIC         542 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                534                 :                 {
 2109 tgl                       535 ECB             :                     /* Signal the sync worker, as it may be waiting for us. */
 2109 tgl                       536 GIC         145 :                     if (syncworker->proc)
                                537             145 :                         logicalrep_worker_wakeup_ptr(syncworker);
 2109 tgl                       538 ECB             : 
                                539                 :                     /* Now safe to release the LWLock */
 2109 tgl                       540 CBC         145 :                     LWLockRelease(LogicalRepWorkerLock);
 2109 tgl                       541 ECB             : 
                                542                 :                     /*
                                543                 :                      * Enter busy loop and wait for synchronization worker to
                                544                 :                      * reach expected state (or die trying).
                                545                 :                      */
 2109 tgl                       546 GIC         145 :                     if (!started_tx)
 2109 tgl                       547 ECB             :                     {
 2109 tgl                       548 LBC           0 :                         StartTransactionCommand();
                                549               0 :                         started_tx = true;
                                550                 :                     }
 2109 tgl                       551 ECB             : 
 2109 tgl                       552 GIC         145 :                     wait_for_relation_state_change(rstate->relid,
                                553                 :                                                    SUBREL_STATE_SYNCDONE);
 2109 tgl                       554 ECB             :                 }
                                555                 :                 else
 2109 tgl                       556 GIC         397 :                     LWLockRelease(LogicalRepWorkerLock);
 2208 peter_e                   557 ECB             :             }
                                558                 :             else
                                559                 :             {
                                560                 :                 /*
 2126 tgl                       561                 :                  * If there is no sync worker for this table yet, count
                                562                 :                  * running sync workers for this subscription, while we have
                                563                 :                  * the lock.
                                564                 :                  */
                                565                 :                 int         nsyncworkers =
 2109 tgl                       566 GIC         703 :                 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
 2208 peter_e                   567 ECB             : 
                                568                 :                 /* Now safe to release the LWLock */
 2109 tgl                       569 GBC         703 :                 LWLockRelease(LogicalRepWorkerLock);
 2208 peter_e                   570 EUB             : 
                                571                 :                 /*
                                572                 :                  * If there are free sync worker slot(s), start a new sync
 2109 tgl                       573 ECB             :                  * worker for the table.
                                574                 :                  */
 2109 tgl                       575 GIC         703 :                 if (nsyncworkers < max_sync_workers_per_subscription)
                                576                 :                 {
 2109 tgl                       577 CBC         159 :                     TimestampTz now = GetCurrentTimestamp();
                                578                 :                     struct tablesync_start_time_mapping *hentry;
                                579                 :                     bool        found;
                                580                 : 
 2109 tgl                       581 GIC         159 :                     hentry = hash_search(last_start_times, &rstate->relid,
                                582                 :                                          HASH_ENTER, &found);
                                583                 : 
                                584             170 :                     if (!found ||
                                585              11 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
                                586                 :                                                    wal_retrieve_retry_interval))
 2109 tgl                       587 ECB             :                     {
 2109 tgl                       588 GIC         153 :                         logicalrep_worker_launch(MyLogicalRepWorker->dbid,
                                589             153 :                                                  MySubscription->oid,
 2109 tgl                       590 CBC         153 :                                                  MySubscription->name,
 2109 tgl                       591 GIC         153 :                                                  MyLogicalRepWorker->userid,
                                592                 :                                                  rstate->relid,
                                593                 :                                                  DSM_HANDLE_INVALID);
                                594             153 :                         hentry->last_start_time = now;
                                595                 :                     }
                                596                 :                 }
 2208 peter_e                   597 ECB             :             }
                                598                 :         }
                                599                 :     }
                                600                 : 
 2162 peter_e                   601 GIC        2476 :     if (started_tx)
                                602                 :     {
                                603                 :         /*
                                604                 :          * Even when the two_phase mode is requested by the user, it remains
                                605                 :          * as 'pending' until all tablesyncs have reached READY state.
                                606                 :          *
                                607                 :          * When this happens, we restart the apply worker and (if the
                                608                 :          * conditions are still ok) then the two_phase tri-state will become
                                609                 :          * 'enabled' at that time.
                                610                 :          *
                                611                 :          * Note: If the subscription has no tables then leave the state as
                                612                 :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
                                613                 :          * work.
                                614                 :          */
   93 tgl                       615 GNC         644 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
                                616                 :         {
                                617              22 :             CommandCounterIncrement();  /* make updates visible */
                                618              22 :             if (AllTablesyncsReady())
                                619                 :             {
                                620               5 :                 ereport(LOG,
                                621                 :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
                                622                 :                                 MySubscription->name)));
                                623               5 :                 should_exit = true;
                                624                 :             }
                                625                 :         }
                                626                 : 
 2162 peter_e                   627 CBC         644 :         CommitTransactionCommand();
  368 andres                    628 GIC         644 :         pgstat_report_stat(true);
                                629                 :     }
                                630                 : 
   93 tgl                       631 GNC        2476 :     if (should_exit)
                                632                 :     {
                                633                 :         /*
                                634                 :          * Reset the last-start time for this worker so that the launcher will
                                635                 :          * restart it without waiting for wal_retrieve_retry_interval.
                                636                 :          */
   77                           637               5 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
                                638                 : 
   93                           639               5 :         proc_exit(0);
                                640                 :     }
 2208 peter_e                   641 CBC        2471 : }
 2208 peter_e                   642 ECB             : 
                                643                 : /*
                                644                 :  * Process possible state change(s) of tables that are being synchronized.
                                645                 :  */
                                646                 : void
 2208 peter_e                   647 CBC        2668 : process_syncing_tables(XLogRecPtr current_lsn)
 2208 peter_e                   648 ECB             : {
                                649                 :     /*
                                650                 :      * Skip for parallel apply workers because they only operate on tables
                                651                 :      * that are in a READY state. See pa_can_start() and
                                652                 :      * should_apply_changes_for_rel().
                                653                 :      */
   90 akapila                   654 GNC        2668 :     if (am_parallel_apply_worker())
                                655              22 :         return;
                                656                 : 
 2208 peter_e                   657 GIC        2646 :     if (am_tablesync_worker())
                                658             170 :         process_syncing_tables_for_sync(current_lsn);
 2208 peter_e                   659 ECB             :     else
 2208 peter_e                   660 GIC        2476 :         process_syncing_tables_for_apply(current_lsn);
                                661                 : }
                                662                 : 
                                663                 : /*
                                664                 :  * Create list of columns for COPY based on logical relation mapping.
                                665                 :  */
 2208 peter_e                   666 ECB             : static List *
 2208 peter_e                   667 GIC         155 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
                                668                 : {
                                669             155 :     List       *attnamelist = NIL;
                                670                 :     int         i;
                                671                 : 
 2152                           672             410 :     for (i = 0; i < rel->remoterel.natts; i++)
                                673                 :     {
 2208                           674             255 :         attnamelist = lappend(attnamelist,
 2152                           675             255 :                               makeString(rel->remoterel.attnames[i]));
                                676                 :     }
                                677                 : 
                                678                 : 
 2208                           679             155 :     return attnamelist;
 2208 peter_e                   680 ECB             : }
                                681                 : 
                                682                 : /*
                                683                 :  * Data source callback for the COPY FROM, which reads from the remote
                                684                 :  * connection and passes the data back to our local COPY.
                                685                 :  */
                                686                 : static int
 2208 peter_e                   687 GIC       13865 : copy_read_data(void *outbuf, int minread, int maxread)
 2208 peter_e                   688 ECB             : {
 2153 bruce                     689 GIC       13865 :     int         bytesread = 0;
                                690                 :     int         avail;
                                691                 : 
 2126 peter_e                   692 ECB             :     /* If there are some leftover data from previous read, use it. */
 2208 peter_e                   693 CBC       13865 :     avail = copybuf->len - copybuf->cursor;
 2208 peter_e                   694 GIC       13865 :     if (avail)
                                695                 :     {
 2208 peter_e                   696 LBC           0 :         if (avail > maxread)
 2208 peter_e                   697 UIC           0 :             avail = maxread;
                                698               0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                                699               0 :         copybuf->cursor += avail;
                                700               0 :         maxread -= avail;
                                701               0 :         bytesread += avail;
 2208 peter_e                   702 ECB             :     }
                                703                 : 
 2137 peter_e                   704 CBC       13866 :     while (maxread > 0 && bytesread < minread)
                                705                 :     {
 2208                           706           13866 :         pgsocket    fd = PGINVALID_SOCKET;
                                707                 :         int         len;
 2208 peter_e                   708 GIC       13866 :         char       *buf = NULL;
                                709                 : 
                                710                 :         for (;;)
                                711                 :         {
 2208 peter_e                   712 ECB             :             /* Try read the data. */
  697 alvherre                  713 GIC       13866 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                                714                 : 
 2208 peter_e                   715           13866 :             CHECK_FOR_INTERRUPTS();
                                716                 : 
                                717           13866 :             if (len == 0)
                                718               1 :                 break;
 2208 peter_e                   719 CBC       13865 :             else if (len < 0)
                                720           13865 :                 return bytesread;
                                721                 :             else
 2208 peter_e                   722 ECB             :             {
                                723                 :                 /* Process the data */
 2208 peter_e                   724 GIC       13712 :                 copybuf->data = buf;
 2208 peter_e                   725 CBC       13712 :                 copybuf->len = len;
 2208 peter_e                   726 GIC       13712 :                 copybuf->cursor = 0;
                                727                 : 
                                728           13712 :                 avail = copybuf->len - copybuf->cursor;
                                729           13712 :                 if (avail > maxread)
 2208 peter_e                   730 UIC           0 :                     avail = maxread;
 2208 peter_e                   731 GIC       13712 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
 2208 peter_e                   732 CBC       13712 :                 outbuf = (void *) ((char *) outbuf + avail);
 2208 peter_e                   733 GIC       13712 :                 copybuf->cursor += avail;
 2208 peter_e                   734 CBC       13712 :                 maxread -= avail;
 2208 peter_e                   735 GIC       13712 :                 bytesread += avail;
                                736                 :             }
 2208 peter_e                   737 ECB             : 
 2208 peter_e                   738 GIC       13712 :             if (maxread <= 0 || bytesread >= minread)
 2208 peter_e                   739 CBC       13712 :                 return bytesread;
 2208 peter_e                   740 ECB             :         }
                                741                 : 
                                742                 :         /*
                                743                 :          * Wait for more data or latch.
                                744                 :          */
 1598 tmunro                    745 GIC           1 :         (void) WaitLatchOrSocket(MyLatch,
                                746                 :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
                                747                 :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                748                 :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
                                749                 : 
 2133 andres                    750               1 :         ResetLatch(MyLatch);
                                751                 :     }
 2208 peter_e                   752 ECB             : 
 2208 peter_e                   753 UIC           0 :     return bytesread;
 2208 peter_e                   754 ECB             : }
                                755                 : 
                                756                 : 
                                757                 : /*
                                758                 :  * Get information about remote relation in similar fashion the RELATION
  411 akapila                   759                 :  * message provides during replication. This function also returns the relation
                                760                 :  * qualifications to be used in the COPY command.
 2208 peter_e                   761 EUB             :  */
                                762                 : static void
 2208 peter_e                   763 GBC         155 : fetch_remote_table_info(char *nspname, char *relname,
  411 akapila                   764 EUB             :                         LogicalRepRelation *lrel, List **qual)
 2208 peter_e                   765                 : {
 2153 bruce                     766                 :     WalRcvExecResult *res;
                                767                 :     StringInfoData cmd;
                                768                 :     TupleTableSlot *slot;
 1116 peter                     769 CBC         155 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
  379 tomas.vondra              770 GIC         155 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
  411 akapila                   771 CBC         155 :     Oid         qualRow[] = {TEXTOID};
                                772                 :     bool        isnull;
 2153 bruce                     773 ECB             :     int         natt;
                                774                 :     ListCell   *lc;
  379 tomas.vondra              775 GIC         155 :     Bitmapset  *included_cols = NULL;
                                776                 : 
 2208 peter_e                   777             155 :     lrel->nspname = nspname;
 2208 peter_e                   778 CBC         155 :     lrel->relname = relname;
                                779                 : 
 2208 peter_e                   780 ECB             :     /* First fetch Oid and replica identity. */
 2208 peter_e                   781 GIC         155 :     initStringInfo(&cmd);
 1116 peter                     782 CBC         155 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
 2153 bruce                     783 ECB             :                      "  FROM pg_catalog.pg_class c"
                                784                 :                      "  INNER JOIN pg_catalog.pg_namespace n"
                                785                 :                      "        ON (c.relnamespace = n.oid)"
                                786                 :                      " WHERE n.nspname = %s"
                                787                 :                      "   AND c.relname = %s",
                                788                 :                      quote_literal_cstr(nspname),
                                789                 :                      quote_literal_cstr(relname));
  697 alvherre                  790 CBC         155 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
  697 alvherre                  791 ECB             :                       lengthof(tableRow), tableRow);
                                792                 : 
 2208 peter_e                   793 CBC         155 :     if (res->status != WALRCV_OK_TUPLES)
 2208 peter_e                   794 LBC           0 :         ereport(ERROR,
  662 tgl                       795 EUB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
  662 tgl                       796 ECB             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
 2208 peter_e                   797                 :                         nspname, relname, res->err)));
                                798                 : 
 1606 andres                    799 CBC         155 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 2208 peter_e                   800             155 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 2208 peter_e                   801 UIC           0 :         ereport(ERROR,
                                802                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
  662 tgl                       803 ECB             :                  errmsg("table \"%s.%s\" not found on publisher",
 2208 peter_e                   804                 :                         nspname, relname)));
                                805                 : 
 2208 peter_e                   806 GIC         155 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
                                807             155 :     Assert(!isnull);
                                808             155 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
                                809             155 :     Assert(!isnull);
 1116 peter                     810 CBC         155 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 1116 peter                     811 GIC         155 :     Assert(!isnull);
                                812                 : 
 2208 peter_e                   813             155 :     ExecDropSingleTupleTableSlot(slot);
                                814             155 :     walrcv_clear_result(res);
 2208 peter_e                   815 ECB             : 
                                816                 : 
                                817                 :     /*
  379 tomas.vondra              818 EUB             :      * Get column lists for each relation.
                                819                 :      *
                                820                 :      * We need to do this before fetching info about column names and types,
                                821                 :      * so that we can skip columns that should not be replicated.
                                822                 :      */
  379 tomas.vondra              823 GIC         155 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
                                824                 :     {
                                825                 :         WalRcvExecResult *pubres;
                                826                 :         TupleTableSlot *tslot;
  311 akapila                   827             155 :         Oid         attrsRow[] = {INT2VECTOROID};
  379 tomas.vondra              828 ECB             :         StringInfoData pub_names;
                                829                 : 
  379 tomas.vondra              830 GIC         155 :         initStringInfo(&pub_names);
                                831             495 :         foreach(lc, MySubscription->publications)
                                832                 :         {
  232 drowley                   833             340 :             if (foreach_current_index(lc) > 0)
  215 drowley                   834 GNC         185 :                 appendStringInfoString(&pub_names, ", ");
  379 tomas.vondra              835 CBC         340 :             appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
  379 tomas.vondra              836 ECB             :         }
                                837                 : 
                                838                 :         /*
                                839                 :          * Fetch info about column lists for the relation (from all the
                                840                 :          * publications).
                                841                 :          */
  379 tomas.vondra              842 GIC         155 :         resetStringInfo(&cmd);
  379 tomas.vondra              843 CBC         155 :         appendStringInfo(&cmd,
  311 akapila                   844 ECB             :                          "SELECT DISTINCT"
                                845                 :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
                                846                 :                          "   THEN NULL ELSE gpt.attrs END)"
  325                           847                 :                          "  FROM pg_publication p,"
  311                           848                 :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
                                849                 :                          "  pg_class c"
                                850                 :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
                                851                 :                          "   AND p.pubname IN ( %s )",
                                852                 :                          lrel->remoteid,
                                853                 :                          pub_names.data);
                                854                 : 
  379 tomas.vondra              855 GIC         155 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
  379 tomas.vondra              856 ECB             :                              lengthof(attrsRow), attrsRow);
                                857                 : 
  379 tomas.vondra              858 GIC         155 :         if (pubres->status != WALRCV_OK_TUPLES)
  379 tomas.vondra              859 LBC           0 :             ereport(ERROR,
  379 tomas.vondra              860 EUB             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
                                861                 :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
                                862                 :                             nspname, relname, pubres->err)));
                                863                 : 
                                864                 :         /*
  311 akapila                   865 ECB             :          * We don't support the case where the column list is different for
                                866                 :          * the same table when combining publications. See comments atop
  311 akapila                   867 EUB             :          * fetch_table_list. So there should be only one row returned.
                                868                 :          * Although we already checked this when creating the subscription, we
                                869                 :          * still need to check here in case the column list was changed after
                                870                 :          * creating the subscription and before the sync worker is started.
                                871                 :          */
  311 akapila                   872 CBC         155 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
  311 akapila                   873 LBC           0 :             ereport(ERROR,
  311 akapila                   874 ECB             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                875                 :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
                                876                 :                            nspname, relname));
                                877                 : 
                                878                 :         /*
                                879                 :          * Get the column list and build a single bitmap with the attnums.
                                880                 :          *
                                881                 :          * If we find a NULL value, it means all the columns should be
                                882                 :          * replicated.
                                883                 :          */
  232 drowley                   884 GIC         155 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
                                885             155 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
                                886                 :         {
                                887             155 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
                                888                 : 
  311 akapila                   889 CBC         155 :             if (!isnull)
                                890                 :             {
                                891                 :                 ArrayType  *arr;
                                892                 :                 int         nelems;
  311 akapila                   893 ECB             :                 int16      *elems;
                                894                 : 
  311 akapila                   895 GIC          19 :                 arr = DatumGetArrayTypeP(cfval);
  311 akapila                   896 CBC          19 :                 nelems = ARR_DIMS(arr)[0];
                                897              19 :                 elems = (int16 *) ARR_DATA_PTR(arr);
                                898                 : 
                                899              53 :                 for (natt = 0; natt < nelems; natt++)
                                900              34 :                     included_cols = bms_add_member(included_cols, elems[natt]);
  311 akapila                   901 ECB             :             }
                                902                 : 
  232 drowley                   903 GIC         155 :             ExecClearTuple(tslot);
                                904                 :         }
                                905             155 :         ExecDropSingleTupleTableSlot(tslot);
                                906                 : 
  379 tomas.vondra              907             155 :         walrcv_clear_result(pubres);
  379 tomas.vondra              908 ECB             : 
  379 tomas.vondra              909 CBC         155 :         pfree(pub_names.data);
                                910                 :     }
                                911                 : 
                                912                 :     /*
                                913                 :      * Now fetch column names and types.
                                914                 :      */
 2208 peter_e                   915 GIC         155 :     resetStringInfo(&cmd);
                                916             155 :     appendStringInfo(&cmd,
                                917                 :                      "SELECT a.attnum,"
                                918                 :                      "       a.attname,"
                                919                 :                      "       a.atttypid,"
                                920                 :                      "       a.attnum = ANY(i.indkey)"
 2208 peter_e                   921 ECB             :                      "  FROM pg_catalog.pg_attribute a"
                                922                 :                      "  LEFT JOIN pg_catalog.pg_index i"
                                923                 :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
                                924                 :                      " WHERE a.attnum > 0::pg_catalog.int2"
 1471 peter                     925 EUB             :                      "   AND NOT a.attisdropped %s"
                                926                 :                      "   AND a.attrelid = %u"
                                927                 :                      " ORDER BY a.attnum",
                                928                 :                      lrel->remoteid,
  697 alvherre                  929 GIC         155 :                      (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
                                930                 :                       "AND a.attgenerated = ''" : ""),
                                931                 :                      lrel->remoteid);
                                932             155 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                933                 :                       lengthof(attrRow), attrRow);
                                934                 : 
 2208 peter_e                   935             155 :     if (res->status != WALRCV_OK_TUPLES)
 2208 peter_e                   936 UIC           0 :         ereport(ERROR,
                                937                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
  662 tgl                       938 ECB             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
 2208 peter_e                   939 EUB             :                         nspname, relname, res->err)));
                                940                 : 
                                941                 :     /* We don't know the number of rows coming, so allocate enough space. */
 2208 peter_e                   942 GIC         155 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
                                943             155 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
                                944             155 :     lrel->attkeys = NULL;
                                945                 : 
                                946                 :     /*
                                947                 :      * Store the columns as a list of names.  Ignore those that are not
                                948                 :      * present in the column list, if there is one.
                                949                 :      */
 2208 peter_e                   950 CBC         155 :     natt = 0;
 1606 andres                    951             155 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 2208 peter_e                   952 GIC         432 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 2208 peter_e                   953 ECB             :     {
                                954                 :         char       *rel_colname;
  379 tomas.vondra              955                 :         AttrNumber  attnum;
                                956                 : 
  379 tomas.vondra              957 GIC         277 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
                                958             277 :         Assert(!isnull);
                                959                 : 
                                960                 :         /* If the column is not in the column list, skip it. */
  379 tomas.vondra              961 CBC         277 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
  379 tomas.vondra              962 ECB             :         {
  379 tomas.vondra              963 CBC          22 :             ExecClearTuple(slot);
  379 tomas.vondra              964 GIC          22 :             continue;
  379 tomas.vondra              965 ECB             :         }
                                966                 : 
  379 tomas.vondra              967 GIC         255 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 2208 peter_e                   968             255 :         Assert(!isnull);
  379 tomas.vondra              969 ECB             : 
  379 tomas.vondra              970 GIC         255 :         lrel->attnames[natt] = rel_colname;
  379 tomas.vondra              971 CBC         255 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 2208 peter_e                   972 GIC         255 :         Assert(!isnull);
  379 tomas.vondra              973 ECB             : 
  379 tomas.vondra              974 GIC         255 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 2208 peter_e                   975 CBC          94 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
                                976                 : 
                                977                 :         /* Should never happen. */
 2208 peter_e                   978 GIC         255 :         if (++natt >= MaxTupleAttributeNumber)
 2208 peter_e                   979 UIC           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
                                980                 :                  nspname, relname);
 2208 peter_e                   981 ECB             : 
 2208 peter_e                   982 CBC         255 :         ExecClearTuple(slot);
                                983                 :     }
 2208 peter_e                   984 GIC         155 :     ExecDropSingleTupleTableSlot(slot);
                                985                 : 
                                986             155 :     lrel->natts = natt;
                                987                 : 
                                988             155 :     walrcv_clear_result(res);
                                989                 : 
                                990                 :     /*
                                991                 :      * Get relation's row filter expressions. DISTINCT avoids the same
                                992                 :      * expression of a table in multiple publications from being included
                                993                 :      * multiple times in the final expression.
                                994                 :      *
  411 akapila                   995 ECB             :      * We need to copy the row even if it matches just one of the
                                996                 :      * publications, so we later combine all the quals with OR.
                                997                 :      *
                                998                 :      * For initial synchronization, row filtering can be ignored in following
                                999                 :      * cases:
                               1000                 :      *
                               1001                 :      * 1) one of the subscribed publications for the table hasn't specified
  411 akapila                  1002 EUB             :      * any row filter
                               1003                 :      *
                               1004                 :      * 2) one of the subscribed publications has puballtables set to true
                               1005                 :      *
                               1006                 :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
                               1007                 :      * that includes this relation
  411 akapila                  1008 ECB             :      */
  411 akapila                  1009 CBC         155 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
  411 akapila                  1010 ECB             :     {
                               1011                 :         StringInfoData pub_names;
                               1012                 : 
                               1013                 :         /* Build the pubname list. */
  411 akapila                  1014 GIC         155 :         initStringInfo(&pub_names);
                               1015             495 :         foreach(lc, MySubscription->publications)
  411 akapila                  1016 ECB             :         {
  411 akapila                  1017 CBC         340 :             char       *pubname = strVal(lfirst(lc));
  411 akapila                  1018 ECB             : 
  232 drowley                  1019 GIC         340 :             if (foreach_current_index(lc) > 0)
  411 akapila                  1020             185 :                 appendStringInfoString(&pub_names, ", ");
                               1021                 : 
                               1022             340 :             appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
  411 akapila                  1023 ECB             :         }
                               1024                 : 
                               1025                 :         /* Check for row filters. */
  411 akapila                  1026 GIC         155 :         resetStringInfo(&cmd);
  411 akapila                  1027 CBC         155 :         appendStringInfo(&cmd,
                               1028                 :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
  325 akapila                  1029 ECB             :                          "  FROM pg_publication p,"
  411                          1030                 :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
                               1031                 :                          " WHERE gpt.relid = %u"
                               1032                 :                          "   AND p.pubname IN ( %s )",
                               1033                 :                          lrel->remoteid,
                               1034                 :                          pub_names.data);
                               1035                 : 
  411 akapila                  1036 CBC         155 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
  411 akapila                  1037 ECB             : 
  411 akapila                  1038 CBC         155 :         if (res->status != WALRCV_OK_TUPLES)
  411 akapila                  1039 UIC           0 :             ereport(ERROR,
  411 akapila                  1040 ECB             :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
                               1041                 :                             nspname, relname, res->err)));
                               1042                 : 
                               1043                 :         /*
                               1044                 :          * Multiple row filter expressions for the same table will be combined
  411 akapila                  1045 EUB             :          * by COPY using OR. If any of the filter expressions for this table
                               1046                 :          * are null, it means the whole table will be copied. In this case it
                               1047                 :          * is not necessary to construct a unified row filter expression at
  411 akapila                  1048 ECB             :          * all.
                               1049                 :          */
  411 akapila                  1050 CBC         155 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
  411 akapila                  1051 GIC         169 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
  411 akapila                  1052 ECB             :         {
  411 akapila                  1053 GIC         159 :             Datum       rf = slot_getattr(slot, 1, &isnull);
  411 akapila                  1054 ECB             : 
  411 akapila                  1055 GIC         159 :             if (!isnull)
                               1056              14 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
                               1057                 :             else
                               1058                 :             {
                               1059                 :                 /* Ignore filters and cleanup as necessary. */
                               1060             145 :                 if (*qual)
                               1061                 :                 {
                               1062               3 :                     list_free_deep(*qual);
                               1063               3 :                     *qual = NIL;
                               1064                 :                 }
                               1065             145 :                 break;
                               1066                 :             }
                               1067                 : 
                               1068              14 :             ExecClearTuple(slot);
                               1069                 :         }
                               1070             155 :         ExecDropSingleTupleTableSlot(slot);
                               1071                 : 
                               1072             155 :         walrcv_clear_result(res);
                               1073                 :     }
                               1074                 : 
 2208 peter_e                  1075 CBC         155 :     pfree(cmd.data);
 2208 peter_e                  1076 GIC         155 : }
                               1077                 : 
                               1078                 : /*
                               1079                 :  * Copy existing data of a table from publisher.
 2208 peter_e                  1080 ECB             :  *
                               1081                 :  * Caller is responsible for locking the local relation.
                               1082                 :  */
                               1083                 : static void
 2208 peter_e                  1084 GIC         155 : copy_table(Relation rel)
 2208 peter_e                  1085 ECB             : {
                               1086                 :     LogicalRepRelMapEntry *relmapentry;
                               1087                 :     LogicalRepRelation lrel;
  411 akapila                  1088 CBC         155 :     List       *qual = NIL;
                               1089                 :     WalRcvExecResult *res;
                               1090                 :     StringInfoData cmd;
                               1091                 :     CopyFromState cstate;
 2208 peter_e                  1092 ECB             :     List       *attnamelist;
 2183                          1093                 :     ParseState *pstate;
   17 akapila                  1094 GNC         155 :     List       *options = NIL;
                               1095                 : 
                               1096                 :     /* Get the publisher relation info. */
 2208 peter_e                  1097 GIC         155 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
  411 akapila                  1098             155 :                             RelationGetRelationName(rel), &lrel, &qual);
                               1099                 : 
                               1100                 :     /* Put the relation into relmap. */
 2208 peter_e                  1101             155 :     logicalrep_relmap_update(&lrel);
                               1102                 : 
 2208 peter_e                  1103 ECB             :     /* Map the publisher relation to local one. */
 2208 peter_e                  1104 GIC         155 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 2208 peter_e                  1105 CBC         155 :     Assert(rel == relmapentry->localrel);
 2208 peter_e                  1106 EUB             : 
                               1107                 :     /* Start copy on the publisher. */
 2208 peter_e                  1108 GIC         155 :     initStringInfo(&cmd);
                               1109                 : 
                               1110                 :     /* Regular table with no row filter */
  411 akapila                  1111             155 :     if (lrel.relkind == RELKIND_RELATION && qual == NIL)
                               1112                 :     {
  379 tomas.vondra             1113             132 :         appendStringInfo(&cmd, "COPY %s (",
 1116 peter                    1114             132 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1115                 : 
                               1116                 :         /*
  332 tgl                      1117 ECB             :          * XXX Do we need to list the columns in all cases? Maybe we're
                               1118                 :          * replicating all columns?
                               1119                 :          */
  379 tomas.vondra             1120 CBC         352 :         for (int i = 0; i < lrel.natts; i++)
                               1121                 :         {
                               1122             220 :             if (i > 0)
                               1123              88 :                 appendStringInfoString(&cmd, ", ");
                               1124                 : 
  379 tomas.vondra             1125 GIC         220 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1126                 :         }
  379 tomas.vondra             1127 ECB             : 
  215 drowley                  1128 GNC         132 :         appendStringInfoString(&cmd, ") TO STDOUT");
  379 tomas.vondra             1129 ECB             :     }
 1116 peter                    1130                 :     else
                               1131                 :     {
                               1132                 :         /*
                               1133                 :          * For non-tables and tables with row filters, we need to do COPY
                               1134                 :          * (SELECT ...), but we can't just do SELECT * because we need to not
  411 akapila                  1135                 :          * copy generated columns. For tables with any row filters, build a
                               1136                 :          * SELECT query with OR'ed row filters for COPY.
 1116 peter                    1137                 :          */
  906 drowley                  1138 GIC          23 :         appendStringInfoString(&cmd, "COPY (SELECT ");
 1116 peter                    1139 CBC          58 :         for (int i = 0; i < lrel.natts; i++)
                               1140                 :         {
 1116 peter                    1141 GIC          35 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
 1116 peter                    1142 CBC          35 :             if (i < lrel.natts - 1)
                               1143              12 :                 appendStringInfoString(&cmd, ", ");
                               1144                 :         }
                               1145                 : 
  411 akapila                  1146 GIC          23 :         appendStringInfoString(&cmd, " FROM ");
                               1147                 : 
                               1148                 :         /*
                               1149                 :          * For regular tables, make sure we don't copy data from a child that
                               1150                 :          * inherits the named table as those will be copied separately.
  411 akapila                  1151 ECB             :          */
  411 akapila                  1152 GIC          23 :         if (lrel.relkind == RELKIND_RELATION)
                               1153               7 :             appendStringInfoString(&cmd, "ONLY ");
                               1154                 : 
  411 akapila                  1155 CBC          23 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1156                 :         /* list of OR'ed filters */
  411 akapila                  1157 GIC          23 :         if (qual != NIL)
                               1158                 :         {
                               1159                 :             ListCell   *lc;
                               1160              10 :             char       *q = strVal(linitial(qual));
  411 akapila                  1161 ECB             : 
  411 akapila                  1162 GIC          10 :             appendStringInfo(&cmd, " WHERE %s", q);
                               1163              11 :             for_each_from(lc, qual, 1)
  411 akapila                  1164 ECB             :             {
  411 akapila                  1165 CBC           1 :                 q = strVal(lfirst(lc));
  411 akapila                  1166 GIC           1 :                 appendStringInfo(&cmd, " OR %s", q);
                               1167                 :             }
  411 akapila                  1168 CBC          10 :             list_free_deep(qual);
                               1169                 :         }
                               1170                 : 
                               1171              23 :         appendStringInfoString(&cmd, ") TO STDOUT");
 1116 peter                    1172 ECB             :     }
                               1173                 : 
                               1174                 :     /*
                               1175                 :      * Prior to v16, initial table synchronization will use text format even
                               1176                 :      * if the binary option is enabled for a subscription.
                               1177                 :      */
   17 akapila                  1178 GNC         155 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
                               1179             155 :         MySubscription->binary)
                               1180                 :     {
                               1181               5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
                               1182               5 :         options = list_make1(makeDefElem("format",
                               1183                 :                                          (Node *) makeString("binary"), -1));
                               1184                 :     }
                               1185                 : 
  697 alvherre                 1186 GIC         155 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 2208 peter_e                  1187             155 :     pfree(cmd.data);
 2208 peter_e                  1188 CBC         155 :     if (res->status != WALRCV_OK_COPY_OUT)
 2208 peter_e                  1189 UIC           0 :         ereport(ERROR,
                               1190                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
  662 tgl                      1191 ECB             :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
                               1192                 :                         lrel.nspname, lrel.relname, res->err)));
 2208 peter_e                  1193 CBC         155 :     walrcv_clear_result(res);
 2208 peter_e                  1194 ECB             : 
 2208 peter_e                  1195 GIC         155 :     copybuf = makeStringInfo();
                               1196                 : 
 2183                          1197             155 :     pstate = make_parsestate(NULL);
 1193 tgl                      1198             155 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
                               1199                 :                                          NULL, false, false);
 2183 peter_e                  1200 ECB             : 
 2208 peter_e                  1201 GIC         155 :     attnamelist = make_copy_attnamelist(relmapentry);
   17 akapila                  1202 GNC         155 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 2208 peter_e                  1203 ECB             : 
                               1204                 :     /* Do the copy */
 2208 peter_e                  1205 CBC         154 :     (void) CopyFrom(cstate);
                               1206                 : 
 2208 peter_e                  1207 GIC         148 :     logicalrep_rel_close(relmapentry, NoLock);
 2208 peter_e                  1208 CBC         148 : }
                               1209                 : 
                               1210                 : /*
                               1211                 :  * Determine the tablesync slot name.
                               1212                 :  *
                               1213                 :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
                               1214                 :  * on slot name length. We append system_identifier to avoid slot_name
                               1215                 :  * collision with subscriptions in other clusters. With the current scheme
                               1216                 :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
                               1217                 :  * length of slot_name will be 50.
  786 akapila                  1218 ECB             :  *
  783                          1219                 :  * The returned slot name is stored in the supplied buffer (syncslotname) with
                               1220                 :  * the given size.
  786                          1221                 :  *
                               1222                 :  * Note: We don't use the subscription slot name as part of tablesync slot name
                               1223                 :  * because we are responsible for cleaning up these slots and it could become
                               1224                 :  * impossible to recalculate what name to cleanup if the subscription slot name
                               1225                 :  * had changed.
                               1226                 :  */
                               1227                 : void
  786 akapila                  1228 GIC         305 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
                               1229                 :                                 char *syncslotname, Size szslot)
                               1230                 : {
  783                          1231             305 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
  783 akapila                  1232 ECB             :              relid, GetSystemIdentifier());
  786 akapila                  1233 CBC         305 : }
                               1234                 : 
                               1235                 : /*
 2208 peter_e                  1236 ECB             :  * Start syncing the table in the sync worker.
                               1237                 :  *
                               1238                 :  * If nothing needs to be done to sync the table, we exit the worker without
  906 alvherre                 1239                 :  * any further action.
                               1240                 :  *
                               1241                 :  * The returned slot name is palloc'ed in current memory context.
                               1242                 :  */
                               1243                 : char *
 2208 peter_e                  1244 GIC         156 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                               1245                 : {
 2153 bruce                    1246 ECB             :     char       *slotname;
                               1247                 :     char       *err;
                               1248                 :     char        relstate;
 2180 fujii                    1249                 :     XLogRecPtr  relstate_lsn;
  906 alvherre                 1250                 :     Relation    rel;
                               1251                 :     AclResult   aclresult;
                               1252                 :     WalRcvExecResult *res;
                               1253                 :     char        originname[NAMEDATALEN];
  786 akapila                  1254                 :     RepOriginId originid;
                               1255                 :     bool        must_use_password;
 2208 peter_e                  1256                 : 
                               1257                 :     /* Check the state of the table synchronization. */
 2208 peter_e                  1258 GBC         156 :     StartTransactionCommand();
 2180 fujii                    1259 GIC         156 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                               1260             156 :                                        MyLogicalRepWorker->relid,
                               1261                 :                                        &relstate_lsn);
 2180 fujii                    1262 CBC         156 :     CommitTransactionCommand();
                               1263                 : 
 2208 peter_e                  1264             156 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 2180 fujii                    1265 GIC         156 :     MyLogicalRepWorker->relstate = relstate;
 2180 fujii                    1266 CBC         156 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 2208 peter_e                  1267             156 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1268                 : 
                               1269                 :     /*
  906 alvherre                 1270 ECB             :      * If synchronization is already done or no longer necessary, exit now
                               1271                 :      * that we've updated shared memory state.
                               1272                 :      */
  906 alvherre                 1273 GIC         156 :     switch (relstate)
  906 alvherre                 1274 ECB             :     {
  906 alvherre                 1275 UIC           0 :         case SUBREL_STATE_SYNCDONE:
  906 alvherre                 1276 ECB             :         case SUBREL_STATE_READY:
                               1277                 :         case SUBREL_STATE_UNKNOWN:
  906 alvherre                 1278 UIC           0 :             finish_sync_worker();   /* doesn't return */
                               1279                 :     }
                               1280                 : 
                               1281                 :     /* Calculate the name of the tablesync slot. */
  783 akapila                  1282 GIC         156 :     slotname = (char *) palloc(NAMEDATALEN);
                               1283             156 :     ReplicationSlotNameForTablesync(MySubscription->oid,
                               1284             156 :                                     MyLogicalRepWorker->relid,
                               1285                 :                                     slotname,
                               1286                 :                                     NAMEDATALEN);
                               1287                 : 
                               1288                 :     /* Is the use of a password mandatory? */
   10 rhaas                    1289 GNC         309 :     must_use_password = MySubscription->passwordrequired &&
                               1290             153 :         !superuser_arg(MySubscription->owner);
                               1291                 : 
                               1292                 :     /*
                               1293                 :      * Here we use the slot name instead of the subscription name as the
                               1294                 :      * application_name, so that it is different from the leader apply worker,
                               1295                 :      * so that synchronous replication can distinguish them.
                               1296                 :      */
  697 alvherre                 1297 GIC         156 :     LogRepWorkerWalRcvConn =
   10 rhaas                    1298 GNC         156 :         walrcv_connect(MySubscription->conninfo, true,
                               1299                 :                        must_use_password,
                               1300                 :                        slotname, &err);
  697 alvherre                 1301 GIC         156 :     if (LogRepWorkerWalRcvConn == NULL)
 2208 peter_e                  1302 UIC           0 :         ereport(ERROR,
  662 tgl                      1303 ECB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1304                 :                  errmsg("could not connect to the publisher: %s", err)));
                               1305                 : 
  906 alvherre                 1306 CBC         156 :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
                               1307                 :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
  786 akapila                  1308 ECB             :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
                               1309                 : 
                               1310                 :     /* Assign the origin tracking record name. */
  180 akapila                  1311 GNC         156 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1312             156 :                                        MyLogicalRepWorker->relid,
                               1313                 :                                        originname,
                               1314                 :                                        sizeof(originname));
                               1315                 : 
  786 akapila                  1316 GIC         156 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
                               1317                 :     {
                               1318                 :         /*
  786 akapila                  1319 ECB             :          * We have previously errored out before finishing the copy so the
                               1320                 :          * replication slot might exist. We want to remove the slot if it
                               1321                 :          * already exists and proceed.
                               1322                 :          *
                               1323                 :          * XXX We could also instead try to drop the slot, last time we failed
                               1324                 :          * but for that, we might need to clean up the copy state as it might
                               1325                 :          * be in the middle of fetching the rows. Also, if there is a network
                               1326                 :          * breakdown then it wouldn't have succeeded so trying it next time
                               1327                 :          * seems like a better bet.
                               1328                 :          */
  697 alvherre                 1329 GIC           6 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
                               1330                 :     }
  786 akapila                  1331             150 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
                               1332                 :     {
  786 akapila                  1333 ECB             :         /*
                               1334                 :          * The COPY phase was previously done, but tablesync then crashed
                               1335                 :          * before it was able to finish normally.
                               1336                 :          */
  786 akapila                  1337 CBC           1 :         StartTransactionCommand();
                               1338                 : 
  786 akapila                  1339 ECB             :         /*
                               1340                 :          * The origin tracking name must already exist. It was created first
                               1341                 :          * time this tablesync was launched.
                               1342                 :          */
  786 akapila                  1343 GIC           1 :         originid = replorigin_by_name(originname, false);
   90 akapila                  1344 GNC           1 :         replorigin_session_setup(originid, 0);
  786 akapila                  1345 GIC           1 :         replorigin_session_origin = originid;
                               1346               1 :         *origin_startpos = replorigin_session_get_progress(false);
                               1347                 : 
  786 akapila                  1348 CBC           1 :         CommitTransactionCommand();
                               1349                 : 
  786 akapila                  1350 GBC           1 :         goto copy_table_done;
                               1351                 :     }
                               1352                 : 
  906 alvherre                 1353             155 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
  906 alvherre                 1354 GIC         155 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                               1355             155 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                               1356             155 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
 2208 peter_e                  1357 ECB             : 
  906 alvherre                 1358                 :     /* Update the state and make it visible to others. */
  906 alvherre                 1359 CBC         155 :     StartTransactionCommand();
  906 alvherre                 1360 GIC         155 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1361             155 :                                MyLogicalRepWorker->relid,
                               1362             155 :                                MyLogicalRepWorker->relstate,
                               1363             155 :                                MyLogicalRepWorker->relstate_lsn);
  906 alvherre                 1364 CBC         155 :     CommitTransactionCommand();
  368 andres                   1365             155 :     pgstat_report_stat(true);
                               1366                 : 
  906 alvherre                 1367 GIC         155 :     StartTransactionCommand();
                               1368                 : 
                               1369                 :     /*
                               1370                 :      * Use a standard write lock here. It might be better to disallow access
                               1371                 :      * to the table while it's being synchronized. But we don't want to block
  906 alvherre                 1372 ECB             :      * the main apply process from working and it has to open the relation in
                               1373                 :      * RowExclusiveLock when remapping remote relation id to local one.
                               1374                 :      */
  906 alvherre                 1375 GIC         155 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 2208 peter_e                  1376 ECB             : 
  457 jdavis                   1377 EUB             :     /*
                               1378                 :      * Check that our table sync worker has permission to insert into the
                               1379                 :      * target table.
                               1380                 :      */
  457 jdavis                   1381 CBC         155 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
                               1382                 :                                   ACL_INSERT);
  457 jdavis                   1383 GIC         155 :     if (aclresult != ACLCHECK_OK)
  457 jdavis                   1384 UIC           0 :         aclcheck_error(aclresult,
                               1385               0 :                        get_relkind_objtype(rel->rd_rel->relkind),
  457 jdavis                   1386 LBC           0 :                        RelationGetRelationName(rel));
  457 jdavis                   1387 ECB             : 
                               1388                 :     /*
                               1389                 :      * COPY FROM does not honor RLS policies.  That is not a problem for
                               1390                 :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
  332 tgl                      1391                 :      * who has it implicitly), but other roles should not be able to
                               1392                 :      * circumvent RLS.  Disallow logical replication into RLS enabled
                               1393                 :      * relations for such roles.
                               1394                 :      */
  457 jdavis                   1395 GIC         155 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
  457 jdavis                   1396 UIC           0 :         ereport(ERROR,
                               1397                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1398                 :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
                               1399                 :                         GetUserNameFromId(GetUserId(), true),
                               1400                 :                         RelationGetRelationName(rel))));
                               1401                 : 
                               1402                 :     /*
                               1403                 :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
  906 alvherre                 1404 ECB             :      * ensures that both the replication slot we create (see below) and the
                               1405                 :      * COPY are consistent with each other.
                               1406                 :      */
  697 alvherre                 1407 GIC         155 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
                               1408                 :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                               1409                 :                       0, NULL);
  906                          1410             155 :     if (res->status != WALRCV_OK_COMMAND)
  906 alvherre                 1411 UIC           0 :         ereport(ERROR,
  662 tgl                      1412 ECB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1413                 :                  errmsg("table copy could not start transaction on publisher: %s",
                               1414                 :                         res->err)));
  906 alvherre                 1415 GIC         155 :     walrcv_clear_result(res);
                               1416                 : 
                               1417                 :     /*
  786 akapila                  1418 ECB             :      * Create a new permanent logical decoding slot. This slot will be used
  906 alvherre                 1419                 :      * for the catchup phase after COPY is done, so tell it to use the
                               1420                 :      * snapshot to make the final data consistent.
                               1421                 :      */
  634 akapila                  1422 GIC         155 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
  634 akapila                  1423 ECB             :                        slotname, false /* permanent */ , false /* two_phase */ ,
                               1424                 :                        CRS_USE_SNAPSHOT, origin_startpos);
 2208 peter_e                  1425                 : 
                               1426                 :     /*
                               1427                 :      * Setup replication origin tracking. The purpose of doing this before the
  786 akapila                  1428                 :      * copy is to avoid doing the copy again due to any error in setting up
                               1429                 :      * origin tracking.
                               1430                 :      */
  786 akapila                  1431 CBC         155 :     originid = replorigin_by_name(originname, true);
  786 akapila                  1432 GIC         155 :     if (!OidIsValid(originid))
                               1433                 :     {
  786 akapila                  1434 ECB             :         /*
                               1435                 :          * Origin tracking does not exist, so create it now.
                               1436                 :          *
                               1437                 :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
                               1438                 :          * logged for the purpose of recovery. Locks are to prevent the
                               1439                 :          * replication origin from vanishing while advancing.
                               1440                 :          */
  786 akapila                  1441 GIC         155 :         originid = replorigin_create(originname);
  786 akapila                  1442 ECB             : 
  786 akapila                  1443 GIC         155 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1444             155 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
                               1445                 :                            true /* go backward */ , true /* WAL log */ );
                               1446             155 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1447                 : 
   90 akapila                  1448 GNC         155 :         replorigin_session_setup(originid, 0);
  786 akapila                  1449 GIC         155 :         replorigin_session_origin = originid;
  786 akapila                  1450 ECB             :     }
                               1451                 :     else
                               1452                 :     {
  786 akapila                  1453 UIC           0 :         ereport(ERROR,
                               1454                 :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1455                 :                  errmsg("replication origin \"%s\" already exists",
  786 akapila                  1456 ECB             :                         originname)));
                               1457                 :     }
                               1458                 : 
  367 tomas.vondra             1459 EUB             :     /* Now do the initial data copy */
  367 tomas.vondra             1460 GBC         155 :     PushActiveSnapshot(GetTransactionSnapshot());
                               1461             155 :     copy_table(rel);
  367 tomas.vondra             1462 GIC         148 :     PopActiveSnapshot();
                               1463                 : 
  697 alvherre                 1464             148 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
  906                          1465             148 :     if (res->status != WALRCV_OK_COMMAND)
  906 alvherre                 1466 UIC           0 :         ereport(ERROR,
                               1467                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1468                 :                  errmsg("table copy could not finish transaction on publisher: %s",
                               1469                 :                         res->err)));
  906 alvherre                 1470 CBC         148 :     walrcv_clear_result(res);
 2208 peter_e                  1471 EUB             : 
  906 alvherre                 1472 GIC         148 :     table_close(rel, NoLock);
                               1473                 : 
                               1474                 :     /* Make the copy visible. */
                               1475             148 :     CommandCounterIncrement();
                               1476                 : 
                               1477                 :     /*
                               1478                 :      * Update the persisted state to indicate the COPY phase is done; make it
                               1479                 :      * visible to others.
                               1480                 :      */
  786 akapila                  1481             148 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
  786 akapila                  1482 CBC         148 :                                MyLogicalRepWorker->relid,
                               1483                 :                                SUBREL_STATE_FINISHEDCOPY,
  786 akapila                  1484 GIC         148 :                                MyLogicalRepWorker->relstate_lsn);
  786 akapila                  1485 ECB             : 
  786 akapila                  1486 GBC         148 :     CommitTransactionCommand();
                               1487                 : 
  786 akapila                  1488 GIC         149 : copy_table_done:
                               1489                 : 
  786 akapila                  1490 CBC         149 :     elog(DEBUG1,
                               1491                 :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
                               1492                 :          originname, LSN_FORMAT_ARGS(*origin_startpos));
                               1493                 : 
                               1494                 :     /*
                               1495                 :      * We are done with the initial data synchronization, update the state.
                               1496                 :      */
  906 alvherre                 1497             149 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
  906 alvherre                 1498 GIC         149 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
                               1499             149 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                               1500             149 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1501                 : 
                               1502                 :     /*
                               1503                 :      * Finally, wait until the leader apply worker tells us to catch up and
                               1504                 :      * then return to let LogicalRepApplyLoop do it.
                               1505                 :      */
  906 alvherre                 1506 CBC         149 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 2208 peter_e                  1507             149 :     return slotname;
                               1508                 : }
                               1509                 : 
                               1510                 : /*
                               1511                 :  * Common code to fetch the up-to-date sync state info into the static lists.
                               1512                 :  *
                               1513                 :  * Returns true if subscription has 1 or more tables, else false.
                               1514                 :  *
                               1515                 :  * Note: If this function started the transaction (indicated by the parameter)
  634 akapila                  1516 ECB             :  * then it is the caller's responsibility to commit it.
                               1517                 :  */
                               1518                 : static bool
  634 akapila                  1519 CBC        2534 : FetchTableStates(bool *started_tx)
                               1520                 : {
  634 akapila                  1521 ECB             :     static bool has_subrels = false;
                               1522                 : 
  634 akapila                  1523 CBC        2534 :     *started_tx = false;
  634 akapila                  1524 ECB             : 
  634 akapila                  1525 GIC        2534 :     if (!table_states_valid)
                               1526                 :     {
                               1527                 :         MemoryContext oldctx;
  634 akapila                  1528 EUB             :         List       *rstates;
                               1529                 :         ListCell   *lc;
                               1530                 :         SubscriptionRelState *rstate;
                               1531                 : 
                               1532                 :         /* Clean the old lists. */
  634 akapila                  1533 GIC         664 :         list_free_deep(table_states_not_ready);
                               1534             664 :         table_states_not_ready = NIL;
  634 akapila                  1535 ECB             : 
  634 akapila                  1536 CBC         664 :         if (!IsTransactionState())
  634 akapila                  1537 ECB             :         {
  634 akapila                  1538 GIC         651 :             StartTransactionCommand();
  634 akapila                  1539 CBC         651 :             *started_tx = true;
  634 akapila                  1540 ECB             :         }
  634 akapila                  1541 EUB             : 
                               1542                 :         /* Fetch all non-ready tables. */
  256 michael                  1543 GNC         664 :         rstates = GetSubscriptionRelations(MySubscription->oid, true);
                               1544                 : 
  634 akapila                  1545 ECB             :         /* Allocate the tracking info in a permanent memory context. */
  634 akapila                  1546 GIC         664 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
  634 akapila                  1547 CBC        1855 :         foreach(lc, rstates)
                               1548                 :         {
  634 akapila                  1549 GIC        1191 :             rstate = palloc(sizeof(SubscriptionRelState));
  634 akapila                  1550 CBC        1191 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
  634 akapila                  1551 GIC        1191 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
                               1552                 :         }
                               1553             664 :         MemoryContextSwitchTo(oldctx);
                               1554                 : 
                               1555                 :         /*
  634 akapila                  1556 ECB             :          * Does the subscription have tables?
                               1557                 :          *
                               1558                 :          * If there were not-READY relations found then we know it does. But
                               1559                 :          * if table_state_not_ready was empty we still need to check again to
                               1560                 :          * see if there are 0 tables.
                               1561                 :          */
  235 tgl                      1562 GNC         831 :         has_subrels = (table_states_not_ready != NIL) ||
  634 akapila                  1563 CBC         167 :             HasSubscriptionRelations(MySubscription->oid);
                               1564                 : 
                               1565             664 :         table_states_valid = true;
                               1566                 :     }
                               1567                 : 
  634 akapila                  1568 GIC        2534 :     return has_subrels;
                               1569                 : }
                               1570                 : 
                               1571                 : /*
  634 akapila                  1572 ECB             :  * If the subscription has no tables then return false.
                               1573                 :  *
                               1574                 :  * Otherwise, are all tablesyncs READY?
                               1575                 :  *
                               1576                 :  * Note: This function is not suitable to be called from outside of apply or
                               1577                 :  * tablesync workers because MySubscription needs to be already initialized.
                               1578                 :  */
                               1579                 : bool
  634 akapila                  1580 GIC          58 : AllTablesyncsReady(void)
  634 akapila                  1581 ECB             : {
  634 akapila                  1582 CBC          58 :     bool        started_tx = false;
  634 akapila                  1583 GIC          58 :     bool        has_subrels = false;
                               1584                 : 
                               1585                 :     /* We need up-to-date sync state info for subscription tables here. */
                               1586              58 :     has_subrels = FetchTableStates(&started_tx);
                               1587                 : 
                               1588              58 :     if (started_tx)
                               1589                 :     {
                               1590               9 :         CommitTransactionCommand();
  368 andres                   1591               9 :         pgstat_report_stat(true);
                               1592                 :     }
                               1593                 : 
  634 akapila                  1594 ECB             :     /*
                               1595                 :      * Return false when there are no tables in subscription or not all tables
                               1596                 :      * are in ready state; true otherwise.
                               1597                 :      */
  235 tgl                      1598 GNC          58 :     return has_subrels && (table_states_not_ready == NIL);
                               1599                 : }
  634 akapila                  1600 ECB             : 
                               1601                 : /*
                               1602                 :  * Update the two_phase state of the specified subscription in pg_subscription.
                               1603                 :  */
                               1604                 : void
  634 akapila                  1605 GIC           4 : UpdateTwoPhaseState(Oid suboid, char new_state)
                               1606                 : {
                               1607                 :     Relation    rel;
  634 akapila                  1608 ECB             :     HeapTuple   tup;
                               1609                 :     bool        nulls[Natts_pg_subscription];
                               1610                 :     bool        replaces[Natts_pg_subscription];
                               1611                 :     Datum       values[Natts_pg_subscription];
                               1612                 : 
  634 akapila                  1613 CBC           4 :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
  634 akapila                  1614 ECB             :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
                               1615                 :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
                               1616                 : 
  634 akapila                  1617 GIC           4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
  634 akapila                  1618 CBC           4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
  634 akapila                  1619 GIC           4 :     if (!HeapTupleIsValid(tup))
  634 akapila                  1620 UIC           0 :         elog(ERROR,
  634 akapila                  1621 ECB             :              "cache lookup failed for subscription oid %u",
                               1622                 :              suboid);
                               1623                 : 
                               1624                 :     /* Form a new tuple. */
  634 akapila                  1625 CBC           4 :     memset(values, 0, sizeof(values));
                               1626               4 :     memset(nulls, false, sizeof(nulls));
  634 akapila                  1627 GIC           4 :     memset(replaces, false, sizeof(replaces));
  634 akapila                  1628 ECB             : 
                               1629                 :     /* And update/set two_phase state */
  634 akapila                  1630 GIC           4 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
                               1631               4 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
                               1632                 : 
                               1633               4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
                               1634                 :                             values, nulls, replaces);
                               1635               4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               1636                 : 
  634 akapila                  1637 CBC           4 :     heap_freetuple(tup);
                               1638               4 :     table_close(rel, RowExclusiveLock);
  634 akapila                  1639 GIC           4 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a