LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit LBC UIC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 93.0 % 487 453 10 24 8 274 43 128 26 297 18
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 16 16 16 15 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  * tablesync.c
       3                 :  *    PostgreSQL logical replication: initial table data synchronization
       4                 :  *
       5                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
       6                 :  *
       7                 :  * IDENTIFICATION
       8                 :  *    src/backend/replication/logical/tablesync.c
       9                 :  *
      10                 :  * NOTES
      11                 :  *    This file contains code for initial table data synchronization for
      12                 :  *    logical replication.
      13                 :  *
      14                 :  *    The initial data synchronization is done separately for each table,
      15                 :  *    in a separate apply worker that only fetches the initial snapshot data
      16                 :  *    from the publisher and then synchronizes the position in the stream with
      17                 :  *    the leader apply worker.
      18                 :  *
      19                 :  *    There are several reasons for doing the synchronization this way:
      20                 :  *     - It allows us to parallelize the initial data synchronization
      21                 :  *       which lowers the time needed for it to happen.
      22                 :  *     - The initial synchronization does not have to hold the xid and LSN
      23                 :  *       for the time it takes to copy data of all tables, causing less
      24                 :  *       bloat and lower disk consumption compared to doing the
      25                 :  *       synchronization in a single process for the whole database.
      26                 :  *     - It allows us to synchronize any tables added after the initial
      27                 :  *       synchronization has finished.
      28                 :  *
      29                 :  *    The stream position synchronization works in multiple steps:
      30                 :  *     - Apply worker requests a tablesync worker to start, setting the new
      31                 :  *       table state to INIT.
      32                 :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
      33                 :  *       copying.
      34                 :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
      35                 :  *       worker specific) state to indicate when the copy phase has completed, so
      36                 :  *       if the worker crashes with this (non-memory) state then the copy will not
      37                 :  *       be re-attempted.
      38                 :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
      39                 :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
      40                 :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
      41                 :  *       until either the table state is set to SYNCDONE or the sync worker
      42                 :  *       exits.
      43                 :  *     - After the sync worker has seen the state change to CATCHUP, it will
      44                 :  *       read the stream and apply changes (acting like an apply worker) until
      45                 :  *       it catches up to the specified stream position.  Then it sets the
      46                 :  *       state to SYNCDONE.  There might be zero changes applied between
      47                 :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      48                 :  *       apply worker.
      49                 :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
      50                 :  *       the table until it reaches the SYNCDONE stream position, at which
      51                 :  *       point it sets state to READY and stops tracking.  Again, there might
      52                 :  *       be zero changes in between.
      53                 :  *
      54                 :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
      55                 :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
      56                 :  *
      57                 :  *    The catalog pg_subscription_rel is used to keep information about
      58                 :  *    subscribed tables and their state.  The catalog holds all states
      59                 :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
      60                 :  *
      61                 :  *    Example flows look like this:
      62                 :  *     - Apply is in front:
      63                 :  *        sync:8
      64                 :  *          -> set in catalog FINISHEDCOPY
      65                 :  *          -> set in memory SYNCWAIT
      66                 :  *        apply:10
      67                 :  *          -> set in memory CATCHUP
      68                 :  *          -> enter wait-loop
      69                 :  *        sync:10
      70                 :  *          -> set in catalog SYNCDONE
      71                 :  *          -> exit
      72                 :  *        apply:10
      73                 :  *          -> exit wait-loop
      74                 :  *          -> continue rep
      75                 :  *        apply:11
      76                 :  *          -> set in catalog READY
      77                 :  *
      78                 :  *     - Sync is in front:
      79                 :  *        sync:10
      80                 :  *          -> set in catalog FINISHEDCOPY
      81                 :  *          -> set in memory SYNCWAIT
      82                 :  *        apply:8
      83                 :  *          -> set in memory CATCHUP
      84                 :  *          -> continue per-table filtering
      85                 :  *        sync:10
      86                 :  *          -> set in catalog SYNCDONE
      87                 :  *          -> exit
      88                 :  *        apply:10
      89                 :  *          -> set in catalog READY
      90                 :  *          -> stop per-table filtering
      91                 :  *          -> continue rep
      92                 :  *-------------------------------------------------------------------------
      93                 :  */
      94                 : 
      95                 : #include "postgres.h"
      96                 : 
      97                 : #include "access/table.h"
      98                 : #include "access/xact.h"
      99                 : #include "catalog/indexing.h"
     100                 : #include "catalog/pg_subscription_rel.h"
     101                 : #include "catalog/pg_type.h"
     102                 : #include "commands/copy.h"
     103                 : #include "miscadmin.h"
     104                 : #include "nodes/makefuncs.h"
     105                 : #include "parser/parse_relation.h"
     106                 : #include "pgstat.h"
     107                 : #include "replication/logicallauncher.h"
     108                 : #include "replication/logicalrelation.h"
     109                 : #include "replication/walreceiver.h"
     110                 : #include "replication/worker_internal.h"
     111                 : #include "replication/slot.h"
     112                 : #include "replication/origin.h"
     113                 : #include "storage/ipc.h"
     114                 : #include "storage/lmgr.h"
     115                 : #include "utils/acl.h"
     116                 : #include "utils/array.h"
     117                 : #include "utils/builtins.h"
     118                 : #include "utils/lsyscache.h"
     119                 : #include "utils/memutils.h"
     120                 : #include "utils/rls.h"
     121                 : #include "utils/snapmgr.h"
     122                 : #include "utils/syscache.h"
     123                 : 
     124                 : static bool table_states_valid = false;
     125                 : static List *table_states_not_ready = NIL;
     126                 : static bool FetchTableStates(bool *started_tx);
     127                 : 
     128                 : static StringInfo copybuf = NULL;
     129                 : 
     130                 : /*
     131                 :  * Exit routine for synchronization worker.
     132                 :  */
     133                 : static void
     134                 : pg_attribute_noreturn()
     135 GIC         148 : finish_sync_worker(void)
     136 ECB             : {
     137                 :     /*
     138                 :      * Commit any outstanding transaction. This is the usual case, unless
     139                 :      * there was nothing to do for the table.
     140                 :      */
     141 GIC         148 :     if (IsTransactionState())
     142 ECB             :     {
     143 GIC         148 :         CommitTransactionCommand();
     144 CBC         148 :         pgstat_report_stat(true);
     145 ECB             :     }
     146                 : 
     147                 :     /* And flush all writes. */
     148 GIC         148 :     XLogFlush(GetXLogWriteRecPtr());
     149 ECB             : 
     150 GIC         148 :     StartTransactionCommand();
     151 CBC         148 :     ereport(LOG,
     152 ECB             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
     153                 :                     MySubscription->name,
     154                 :                     get_rel_name(MyLogicalRepWorker->relid))));
     155 GIC         148 :     CommitTransactionCommand();
     156 ECB             : 
     157                 :     /* Find the leader apply worker and signal it. */
     158 GIC         148 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
     159 ECB             : 
     160                 :     /* Stop gracefully */
     161 GIC         148 :     proc_exit(0);
     162 ECB             : }
     163                 : 
     164                 : /*
     165                 :  * Wait until the relation sync state is set in the catalog to the expected
     166                 :  * one; return true when it happens.
     167                 :  *
     168                 :  * Returns false if the table sync worker or the table itself have
     169                 :  * disappeared, or the table state has been reset.
     170                 :  *
     171                 :  * Currently, this is used in the apply worker when transitioning from
     172                 :  * CATCHUP state to SYNCDONE.
     173                 :  */
     174                 : static bool
     175 GIC         145 : wait_for_relation_state_change(Oid relid, char expected_state)
     176 ECB             : {
     177                 :     char        state;
     178                 : 
     179                 :     for (;;)
     180 GIC         147 :     {
     181 ECB             :         LogicalRepWorker *worker;
     182                 :         XLogRecPtr  statelsn;
     183                 : 
     184 GIC         292 :         CHECK_FOR_INTERRUPTS();
     185 ECB             : 
     186 GIC         292 :         InvalidateCatalogSnapshot();
     187 CBC         292 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     188 ECB             :                                         relid, &statelsn);
     189                 : 
     190 GIC         292 :         if (state == SUBREL_STATE_UNKNOWN)
     191 LBC           0 :             break;
     192 EUB             : 
     193 GIC         292 :         if (state == expected_state)
     194 CBC         145 :             return true;
     195 ECB             : 
     196                 :         /* Check if the sync worker is still running and bail if not. */
     197 GIC         147 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     198 CBC         147 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
     199 ECB             :                                         false);
     200 GIC         147 :         LWLockRelease(LogicalRepWorkerLock);
     201 CBC         147 :         if (!worker)
     202 LBC           0 :             break;
     203 EUB             : 
     204 GIC         147 :         (void) WaitLatch(MyLatch,
     205 ECB             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     206                 :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     207                 : 
     208 GIC         147 :         ResetLatch(MyLatch);
     209 ECB             :     }
     210                 : 
     211 UIC           0 :     return false;
     212 EUB             : }
     213                 : 
     214                 : /*
     215                 :  * Wait until the apply worker changes the state of our synchronization
     216                 :  * worker to the expected one.
     217                 :  *
     218                 :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     219                 :  *
     220                 :  * Returns false if the apply worker has disappeared.
     221                 :  */
     222                 : static bool
     223 GIC         149 : wait_for_worker_state_change(char expected_state)
     224 ECB             : {
     225                 :     int         rc;
     226                 : 
     227                 :     for (;;)
     228 GIC         149 :     {
     229 ECB             :         LogicalRepWorker *worker;
     230                 : 
     231 GIC         298 :         CHECK_FOR_INTERRUPTS();
     232 ECB             : 
     233                 :         /*
     234                 :          * Done if already in correct state.  (We assume this fetch is atomic
     235                 :          * enough to not give a misleading answer if we do it with no lock.)
     236                 :          */
     237 GIC         298 :         if (MyLogicalRepWorker->relstate == expected_state)
     238 CBC         149 :             return true;
     239 ECB             : 
     240                 :         /*
     241                 :          * Bail out if the apply worker has died, else signal it we're
     242                 :          * waiting.
     243                 :          */
     244 GIC         149 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     245 CBC         149 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     246 ECB             :                                         InvalidOid, false);
     247 GIC         149 :         if (worker && worker->proc)
     248 CBC         149 :             logicalrep_worker_wakeup_ptr(worker);
     249             149 :         LWLockRelease(LogicalRepWorkerLock);
     250             149 :         if (!worker)
     251 LBC           0 :             break;
     252 EUB             : 
     253                 :         /*
     254                 :          * Wait.  We expect to get a latch signal back from the apply worker,
     255                 :          * but use a timeout in case it dies without sending one.
     256                 :          */
     257 GIC         149 :         rc = WaitLatch(MyLatch,
     258 ECB             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     259                 :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     260                 : 
     261 GIC         149 :         if (rc & WL_LATCH_SET)
     262 CBC         149 :             ResetLatch(MyLatch);
     263 ECB             :     }
     264                 : 
     265 UIC           0 :     return false;
     266 EUB             : }
     267                 : 
     268                 : /*
     269                 :  * Callback from syscache invalidation.
     270                 :  */
     271                 : void
     272 GIC        1038 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
     273 ECB             : {
     274 GIC        1038 :     table_states_valid = false;
     275 CBC        1038 : }
     276 ECB             : 
     277                 : /*
     278                 :  * Handle table synchronization cooperation from the synchronization
     279                 :  * worker.
     280                 :  *
     281                 :  * If the sync worker is in CATCHUP state and reached (or passed) the
     282                 :  * predetermined synchronization point in the WAL stream, mark the table as
     283                 :  * SYNCDONE and finish.
     284                 :  */
     285                 : static void
     286 GIC         170 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
     287 ECB             : {
     288 GIC         170 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     289 ECB             : 
     290 GIC         170 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     291 CBC         170 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     292 ECB             :     {
     293                 :         TimeLineID  tli;
     294 GIC         149 :         char        syncslotname[NAMEDATALEN] = {0};
     295 GNC         149 :         char        originname[NAMEDATALEN] = {0};
     296 ECB             : 
     297 CBC         149 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     298 GIC         149 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     299 ECB             : 
     300 CBC         149 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     301                 : 
     302 ECB             :         /*
     303                 :          * UpdateSubscriptionRelState must be called within a transaction.
     304                 :          */
     305 GIC         149 :         if (!IsTransactionState())
     306 CBC         149 :             StartTransactionCommand();
     307 ECB             : 
     308 GIC         149 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     309 CBC         149 :                                    MyLogicalRepWorker->relid,
     310             149 :                                    MyLogicalRepWorker->relstate,
     311             149 :                                    MyLogicalRepWorker->relstate_lsn);
     312 ECB             : 
     313                 :         /*
     314                 :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
     315                 :          * the slot.
     316                 :          */
     317 GIC         149 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
     318 ECB             : 
     319                 :         /*
     320                 :          * Cleanup the tablesync slot.
     321                 :          *
     322                 :          * This has to be done after updating the state because otherwise if
     323                 :          * there is an error while doing the database operations we won't be
     324                 :          * able to rollback dropped slot.
     325                 :          */
     326 GIC         148 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
     327 CBC         148 :                                         MyLogicalRepWorker->relid,
     328 ECB             :                                         syncslotname,
     329                 :                                         sizeof(syncslotname));
     330                 : 
     331                 :         /*
     332                 :          * It is important to give an error if we are unable to drop the slot,
     333                 :          * otherwise, it won't be dropped till the corresponding subscription
     334                 :          * is dropped. So passing missing_ok = false.
     335                 :          */
     336 GIC         148 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
     337 ECB             : 
     338 GNC         148 :         CommitTransactionCommand();
     339             148 :         pgstat_report_stat(false);
     340                 : 
     341                 :         /*
     342                 :          * Start a new transaction to clean up the tablesync origin tracking.
     343                 :          * This transaction will be ended within the finish_sync_worker().
     344                 :          * Now, even, if we fail to remove this here, the apply worker will
     345                 :          * ensure to clean it up afterward.
     346                 :          *
     347                 :          * We need to do this after the table state is set to SYNCDONE.
     348                 :          * Otherwise, if an error occurs while performing the database
     349                 :          * operation, the worker will be restarted and the in-memory state of
     350                 :          * replication progress (remote_lsn) won't be rolled-back which would
     351                 :          * have been cleared before restart. So, the restarted worker will use
     352                 :          * invalid replication progress state resulting in replay of
     353                 :          * transactions that have already been applied.
     354                 :          */
     355             148 :         StartTransactionCommand();
     356                 : 
     357             148 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     358             148 :                                            MyLogicalRepWorker->relid,
     359                 :                                            originname,
     360                 :                                            sizeof(originname));
     361                 : 
     362                 :         /*
     363                 :          * Resetting the origin session removes the ownership of the slot.
     364                 :          * This is needed to allow the origin to be dropped.
     365                 :          */
     366             148 :         replorigin_session_reset();
     367             148 :         replorigin_session_origin = InvalidRepOriginId;
     368             148 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
     369             148 :         replorigin_session_origin_timestamp = 0;
     370                 : 
     371                 :         /*
     372                 :          * Drop the tablesync's origin tracking if exists.
     373                 :          *
     374                 :          * There is a chance that the user is concurrently performing refresh
     375                 :          * for the subscription where we remove the table state and its origin
     376                 :          * or the apply worker would have removed this origin. So passing
     377                 :          * missing_ok = true.
     378                 :          */
     379             148 :         replorigin_drop_by_name(originname, true, false);
     380                 : 
     381 GIC         148 :         finish_sync_worker();
     382 ECB             :     }
     383                 :     else
     384 GIC          21 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     385              21 : }
     386                 : 
     387                 : /*
     388                 :  * Handle table synchronization cooperation from the apply worker.
     389                 :  *
     390                 :  * Walk over all subscription tables that are individually tracked by the
     391                 :  * apply process (currently, all that have state other than
     392                 :  * SUBREL_STATE_READY) and manage synchronization for them.
     393                 :  *
     394                 :  * If there are tables that need synchronizing and are not being synchronized
     395                 :  * yet, start sync workers for them (if there are free slots for sync
     396                 :  * workers).  To prevent starting the sync worker for the same relation at a
     397                 :  * high frequency after a failure, we store its last start time with each sync
     398                 :  * state info.  We start the sync worker for the same relation after waiting
     399 ECB             :  * at least wal_retrieve_retry_interval.
     400                 :  *
     401                 :  * For tables that are being synchronized already, check if sync workers
     402                 :  * either need action from the apply worker or have finished.  This is the
     403                 :  * SYNCWAIT to CATCHUP transition.
     404                 :  *
     405                 :  * If the synchronization position is reached (SYNCDONE), then the table can
     406                 :  * be marked as READY and is no longer tracked.
     407                 :  */
     408                 : static void
     409 GIC        2476 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
     410 ECB             : {
     411                 :     struct tablesync_start_time_mapping
     412                 :     {
     413                 :         Oid         relid;
     414                 :         TimestampTz last_start_time;
     415                 :     };
     416                 :     static HTAB *last_start_times = NULL;
     417                 :     ListCell   *lc;
     418 GIC        2476 :     bool        started_tx = false;
     419 GNC        2476 :     bool        should_exit = false;
     420                 : 
     421 GIC        2476 :     Assert(!IsTransactionState());
     422                 : 
     423                 :     /* We need up-to-date sync state info for subscription tables here. */
     424 CBC        2476 :     FetchTableStates(&started_tx);
     425                 : 
     426 ECB             :     /*
     427                 :      * Prepare a hash table for tracking last start times of workers, to avoid
     428                 :      * immediate restarts.  We don't need it if there are no tables that need
     429                 :      * syncing.
     430                 :      */
     431 GNC        2476 :     if (table_states_not_ready != NIL && !last_start_times)
     432 GIC          88 :     {
     433                 :         HASHCTL     ctl;
     434                 : 
     435              88 :         ctl.keysize = sizeof(Oid);
     436              88 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     437              88 :         last_start_times = hash_create("Logical replication table sync worker start times",
     438                 :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     439                 :     }
     440                 : 
     441                 :     /*
     442                 :      * Clean up the hash table when we're done with all tables (just to
     443                 :      * release the bit of memory).
     444                 :      */
     445 GNC        2388 :     else if (table_states_not_ready == NIL && last_start_times)
     446                 :     {
     447 GIC          68 :         hash_destroy(last_start_times);
     448              68 :         last_start_times = NULL;
     449                 :     }
     450                 : 
     451                 :     /*
     452                 :      * Process all tables that are being synchronized.
     453                 :      */
     454 CBC        3865 :     foreach(lc, table_states_not_ready)
     455 ECB             :     {
     456 GIC        1389 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     457                 : 
     458 CBC        1389 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     459 ECB             :         {
     460                 :             /*
     461                 :              * Apply has caught up to the position where the table sync has
     462                 :              * finished.  Mark the table as ready so that the apply will just
     463                 :              * continue to replicate it normally.
     464                 :              */
     465 GIC         144 :             if (current_lsn >= rstate->lsn)
     466                 :             {
     467                 :                 char        originname[NAMEDATALEN];
     468 ECB             : 
     469 GIC         144 :                 rstate->state = SUBREL_STATE_READY;
     470 CBC         144 :                 rstate->lsn = current_lsn;
     471             144 :                 if (!started_tx)
     472                 :                 {
     473 GIC           2 :                     StartTransactionCommand();
     474               2 :                     started_tx = true;
     475                 :                 }
     476                 : 
     477 ECB             :                 /*
     478                 :                  * Remove the tablesync origin tracking if exists.
     479                 :                  *
     480                 :                  * There is a chance that the user is concurrently performing
     481                 :                  * refresh for the subscription where we remove the table
     482                 :                  * state and its origin or the tablesync worker would have
     483                 :                  * already removed this origin. We can't rely on tablesync
     484                 :                  * worker to remove the origin tracking as if there is any
     485                 :                  * error while dropping we won't restart it to drop the
     486                 :                  * origin. So passing missing_ok = true.
     487                 :                  */
     488 GNC         144 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
     489                 :                                                    rstate->relid,
     490                 :                                                    originname,
     491                 :                                                    sizeof(originname));
     492 CBC         144 :                 replorigin_drop_by_name(originname, true, false);
     493                 : 
     494 ECB             :                 /*
     495                 :                  * Update the state to READY only after the origin cleanup.
     496                 :                  */
     497 GIC         144 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
     498             144 :                                            rstate->relid, rstate->state,
     499                 :                                            rstate->lsn);
     500                 :             }
     501                 :         }
     502                 :         else
     503                 :         {
     504                 :             LogicalRepWorker *syncworker;
     505                 : 
     506                 :             /*
     507                 :              * Look for a sync worker for this relation.
     508                 :              */
     509 CBC        1245 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     510                 : 
     511 GIC        1245 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     512                 :                                                 rstate->relid, false);
     513 ECB             : 
     514 GIC        1245 :             if (syncworker)
     515                 :             {
     516                 :                 /* Found one, update our copy of its state */
     517             542 :                 SpinLockAcquire(&syncworker->relmutex);
     518 CBC         542 :                 rstate->state = syncworker->relstate;
     519             542 :                 rstate->lsn = syncworker->relstate_lsn;
     520 GIC         542 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     521                 :                 {
     522                 :                     /*
     523                 :                      * Sync worker is waiting for apply.  Tell sync worker it
     524                 :                      * can catchup now.
     525                 :                      */
     526             145 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     527             145 :                     syncworker->relstate_lsn =
     528             145 :                         Max(syncworker->relstate_lsn, current_lsn);
     529                 :                 }
     530 CBC         542 :                 SpinLockRelease(&syncworker->relmutex);
     531                 : 
     532 ECB             :                 /* If we told worker to catch up, wait for it. */
     533 GIC         542 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     534                 :                 {
     535 ECB             :                     /* Signal the sync worker, as it may be waiting for us. */
     536 GIC         145 :                     if (syncworker->proc)
     537             145 :                         logicalrep_worker_wakeup_ptr(syncworker);
     538 ECB             : 
     539                 :                     /* Now safe to release the LWLock */
     540 CBC         145 :                     LWLockRelease(LogicalRepWorkerLock);
     541 ECB             : 
     542                 :                     /*
     543                 :                      * Enter busy loop and wait for synchronization worker to
     544                 :                      * reach expected state (or die trying).
     545                 :                      */
     546 GIC         145 :                     if (!started_tx)
     547 ECB             :                     {
     548 LBC           0 :                         StartTransactionCommand();
     549               0 :                         started_tx = true;
     550                 :                     }
     551 ECB             : 
     552 GIC         145 :                     wait_for_relation_state_change(rstate->relid,
     553                 :                                                    SUBREL_STATE_SYNCDONE);
     554 ECB             :                 }
     555                 :                 else
     556 GIC         397 :                     LWLockRelease(LogicalRepWorkerLock);
     557 ECB             :             }
     558                 :             else
     559                 :             {
     560                 :                 /*
     561                 :                  * If there is no sync worker for this table yet, count
     562                 :                  * running sync workers for this subscription, while we have
     563                 :                  * the lock.
     564                 :                  */
     565                 :                 int         nsyncworkers =
     566 GIC         703 :                 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     567 ECB             : 
     568                 :                 /* Now safe to release the LWLock */
     569 GBC         703 :                 LWLockRelease(LogicalRepWorkerLock);
     570 EUB             : 
     571                 :                 /*
     572                 :                  * If there are free sync worker slot(s), start a new sync
     573 ECB             :                  * worker for the table.
     574                 :                  */
     575 GIC         703 :                 if (nsyncworkers < max_sync_workers_per_subscription)
     576                 :                 {
     577 CBC         159 :                     TimestampTz now = GetCurrentTimestamp();
     578                 :                     struct tablesync_start_time_mapping *hentry;
     579                 :                     bool        found;
     580                 : 
     581 GIC         159 :                     hentry = hash_search(last_start_times, &rstate->relid,
     582                 :                                          HASH_ENTER, &found);
     583                 : 
     584             170 :                     if (!found ||
     585              11 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
     586                 :                                                    wal_retrieve_retry_interval))
     587 ECB             :                     {
     588 GIC         153 :                         logicalrep_worker_launch(MyLogicalRepWorker->dbid,
     589             153 :                                                  MySubscription->oid,
     590 CBC         153 :                                                  MySubscription->name,
     591 GIC         153 :                                                  MyLogicalRepWorker->userid,
     592                 :                                                  rstate->relid,
     593                 :                                                  DSM_HANDLE_INVALID);
     594             153 :                         hentry->last_start_time = now;
     595                 :                     }
     596                 :                 }
     597 ECB             :             }
     598                 :         }
     599                 :     }
     600                 : 
     601 GIC        2476 :     if (started_tx)
     602                 :     {
     603                 :         /*
     604                 :          * Even when the two_phase mode is requested by the user, it remains
     605                 :          * as 'pending' until all tablesyncs have reached READY state.
     606                 :          *
     607                 :          * When this happens, we restart the apply worker and (if the
     608                 :          * conditions are still ok) then the two_phase tri-state will become
     609                 :          * 'enabled' at that time.
     610                 :          *
     611                 :          * Note: If the subscription has no tables then leave the state as
     612                 :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
     613                 :          * work.
     614                 :          */
     615 GNC         644 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
     616                 :         {
     617              22 :             CommandCounterIncrement();  /* make updates visible */
     618              22 :             if (AllTablesyncsReady())
     619                 :             {
     620               5 :                 ereport(LOG,
     621                 :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
     622                 :                                 MySubscription->name)));
     623               5 :                 should_exit = true;
     624                 :             }
     625                 :         }
     626                 : 
     627 CBC         644 :         CommitTransactionCommand();
     628 GIC         644 :         pgstat_report_stat(true);
     629                 :     }
     630                 : 
     631 GNC        2476 :     if (should_exit)
     632                 :     {
     633                 :         /*
     634                 :          * Reset the last-start time for this worker so that the launcher will
     635                 :          * restart it without waiting for wal_retrieve_retry_interval.
     636                 :          */
     637               5 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
     638                 : 
     639               5 :         proc_exit(0);
     640                 :     }
     641 CBC        2471 : }
     642 ECB             : 
     643                 : /*
     644                 :  * Process possible state change(s) of tables that are being synchronized.
     645                 :  */
     646                 : void
     647 CBC        2668 : process_syncing_tables(XLogRecPtr current_lsn)
     648 ECB             : {
     649                 :     /*
     650                 :      * Skip for parallel apply workers because they only operate on tables
     651                 :      * that are in a READY state. See pa_can_start() and
     652                 :      * should_apply_changes_for_rel().
     653                 :      */
     654 GNC        2668 :     if (am_parallel_apply_worker())
     655              22 :         return;
     656                 : 
     657 GIC        2646 :     if (am_tablesync_worker())
     658             170 :         process_syncing_tables_for_sync(current_lsn);
     659 ECB             :     else
     660 GIC        2476 :         process_syncing_tables_for_apply(current_lsn);
     661                 : }
     662                 : 
     663                 : /*
     664                 :  * Create list of columns for COPY based on logical relation mapping.
     665                 :  */
     666 ECB             : static List *
     667 GIC         155 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     668                 : {
     669             155 :     List       *attnamelist = NIL;
     670                 :     int         i;
     671                 : 
     672             410 :     for (i = 0; i < rel->remoterel.natts; i++)
     673                 :     {
     674             255 :         attnamelist = lappend(attnamelist,
     675             255 :                               makeString(rel->remoterel.attnames[i]));
     676                 :     }
     677                 : 
     678                 : 
     679             155 :     return attnamelist;
     680 ECB             : }
     681                 : 
     682                 : /*
     683                 :  * Data source callback for the COPY FROM, which reads from the remote
     684                 :  * connection and passes the data back to our local COPY.
     685                 :  */
     686                 : static int
     687 GIC       13865 : copy_read_data(void *outbuf, int minread, int maxread)
     688 ECB             : {
     689 GIC       13865 :     int         bytesread = 0;
     690                 :     int         avail;
     691                 : 
     692 ECB             :     /* If there are some leftover data from previous read, use it. */
     693 CBC       13865 :     avail = copybuf->len - copybuf->cursor;
     694 GIC       13865 :     if (avail)
     695                 :     {
     696 LBC           0 :         if (avail > maxread)
     697 UIC           0 :             avail = maxread;
     698               0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     699               0 :         copybuf->cursor += avail;
     700               0 :         maxread -= avail;
     701               0 :         bytesread += avail;
     702 ECB             :     }
     703                 : 
     704 CBC       13866 :     while (maxread > 0 && bytesread < minread)
     705                 :     {
     706           13866 :         pgsocket    fd = PGINVALID_SOCKET;
     707                 :         int         len;
     708 GIC       13866 :         char       *buf = NULL;
     709                 : 
     710                 :         for (;;)
     711                 :         {
     712 ECB             :             /* Try read the data. */
     713 GIC       13866 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
     714                 : 
     715           13866 :             CHECK_FOR_INTERRUPTS();
     716                 : 
     717           13866 :             if (len == 0)
     718               1 :                 break;
     719 CBC       13865 :             else if (len < 0)
     720           13865 :                 return bytesread;
     721                 :             else
     722 ECB             :             {
     723                 :                 /* Process the data */
     724 GIC       13712 :                 copybuf->data = buf;
     725 CBC       13712 :                 copybuf->len = len;
     726 GIC       13712 :                 copybuf->cursor = 0;
     727                 : 
     728           13712 :                 avail = copybuf->len - copybuf->cursor;
     729           13712 :                 if (avail > maxread)
     730 UIC           0 :                     avail = maxread;
     731 GIC       13712 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     732 CBC       13712 :                 outbuf = (void *) ((char *) outbuf + avail);
     733 GIC       13712 :                 copybuf->cursor += avail;
     734 CBC       13712 :                 maxread -= avail;
     735 GIC       13712 :                 bytesread += avail;
     736                 :             }
     737 ECB             : 
     738 GIC       13712 :             if (maxread <= 0 || bytesread >= minread)
     739 CBC       13712 :                 return bytesread;
     740 ECB             :         }
     741                 : 
     742                 :         /*
     743                 :          * Wait for more data or latch.
     744                 :          */
     745 GIC           1 :         (void) WaitLatchOrSocket(MyLatch,
     746                 :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
     747                 :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     748                 :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     749                 : 
     750               1 :         ResetLatch(MyLatch);
     751                 :     }
     752 ECB             : 
     753 UIC           0 :     return bytesread;
     754 ECB             : }
     755                 : 
     756                 : 
     757                 : /*
     758                 :  * Get information about remote relation in similar fashion the RELATION
     759                 :  * message provides during replication. This function also returns the relation
     760                 :  * qualifications to be used in the COPY command.
     761 EUB             :  */
     762                 : static void
     763 GBC         155 : fetch_remote_table_info(char *nspname, char *relname,
     764 EUB             :                         LogicalRepRelation *lrel, List **qual)
     765                 : {
     766                 :     WalRcvExecResult *res;
     767                 :     StringInfoData cmd;
     768                 :     TupleTableSlot *slot;
     769 CBC         155 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
     770 GIC         155 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
     771 CBC         155 :     Oid         qualRow[] = {TEXTOID};
     772                 :     bool        isnull;
     773 ECB             :     int         natt;
     774                 :     ListCell   *lc;
     775 GIC         155 :     Bitmapset  *included_cols = NULL;
     776                 : 
     777             155 :     lrel->nspname = nspname;
     778 CBC         155 :     lrel->relname = relname;
     779                 : 
     780 ECB             :     /* First fetch Oid and replica identity. */
     781 GIC         155 :     initStringInfo(&cmd);
     782 CBC         155 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
     783 ECB             :                      "  FROM pg_catalog.pg_class c"
     784                 :                      "  INNER JOIN pg_catalog.pg_namespace n"
     785                 :                      "        ON (c.relnamespace = n.oid)"
     786                 :                      " WHERE n.nspname = %s"
     787                 :                      "   AND c.relname = %s",
     788                 :                      quote_literal_cstr(nspname),
     789                 :                      quote_literal_cstr(relname));
     790 CBC         155 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     791 ECB             :                       lengthof(tableRow), tableRow);
     792                 : 
     793 CBC         155 :     if (res->status != WALRCV_OK_TUPLES)
     794 LBC           0 :         ereport(ERROR,
     795 EUB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     796 ECB             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     797                 :                         nspname, relname, res->err)));
     798                 : 
     799 CBC         155 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     800             155 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     801 UIC           0 :         ereport(ERROR,
     802                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     803 ECB             :                  errmsg("table \"%s.%s\" not found on publisher",
     804                 :                         nspname, relname)));
     805                 : 
     806 GIC         155 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     807             155 :     Assert(!isnull);
     808             155 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     809             155 :     Assert(!isnull);
     810 CBC         155 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
     811 GIC         155 :     Assert(!isnull);
     812                 : 
     813             155 :     ExecDropSingleTupleTableSlot(slot);
     814             155 :     walrcv_clear_result(res);
     815 ECB             : 
     816                 : 
     817                 :     /*
     818 EUB             :      * Get column lists for each relation.
     819                 :      *
     820                 :      * We need to do this before fetching info about column names and types,
     821                 :      * so that we can skip columns that should not be replicated.
     822                 :      */
     823 GIC         155 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
     824                 :     {
     825                 :         WalRcvExecResult *pubres;
     826                 :         TupleTableSlot *tslot;
     827             155 :         Oid         attrsRow[] = {INT2VECTOROID};
     828 ECB             :         StringInfoData pub_names;
     829                 : 
     830 GIC         155 :         initStringInfo(&pub_names);
     831             495 :         foreach(lc, MySubscription->publications)
     832                 :         {
     833             340 :             if (foreach_current_index(lc) > 0)
     834 GNC         185 :                 appendStringInfoString(&pub_names, ", ");
     835 CBC         340 :             appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
     836 ECB             :         }
     837                 : 
     838                 :         /*
     839                 :          * Fetch info about column lists for the relation (from all the
     840                 :          * publications).
     841                 :          */
     842 GIC         155 :         resetStringInfo(&cmd);
     843 CBC         155 :         appendStringInfo(&cmd,
     844 ECB             :                          "SELECT DISTINCT"
     845                 :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
     846                 :                          "   THEN NULL ELSE gpt.attrs END)"
     847                 :                          "  FROM pg_publication p,"
     848                 :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
     849                 :                          "  pg_class c"
     850                 :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
     851                 :                          "   AND p.pubname IN ( %s )",
     852                 :                          lrel->remoteid,
     853                 :                          pub_names.data);
     854                 : 
     855 GIC         155 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     856 ECB             :                              lengthof(attrsRow), attrsRow);
     857                 : 
     858 GIC         155 :         if (pubres->status != WALRCV_OK_TUPLES)
     859 LBC           0 :             ereport(ERROR,
     860 EUB             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     861                 :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
     862                 :                             nspname, relname, pubres->err)));
     863                 : 
     864                 :         /*
     865 ECB             :          * We don't support the case where the column list is different for
     866                 :          * the same table when combining publications. See comments atop
     867 EUB             :          * fetch_table_list. So there should be only one row returned.
     868                 :          * Although we already checked this when creating the subscription, we
     869                 :          * still need to check here in case the column list was changed after
     870                 :          * creating the subscription and before the sync worker is started.
     871                 :          */
     872 CBC         155 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
     873 LBC           0 :             ereport(ERROR,
     874 ECB             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     875                 :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
     876                 :                            nspname, relname));
     877                 : 
     878                 :         /*
     879                 :          * Get the column list and build a single bitmap with the attnums.
     880                 :          *
     881                 :          * If we find a NULL value, it means all the columns should be
     882                 :          * replicated.
     883                 :          */
     884 GIC         155 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
     885             155 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
     886                 :         {
     887             155 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
     888                 : 
     889 CBC         155 :             if (!isnull)
     890                 :             {
     891                 :                 ArrayType  *arr;
     892                 :                 int         nelems;
     893 ECB             :                 int16      *elems;
     894                 : 
     895 GIC          19 :                 arr = DatumGetArrayTypeP(cfval);
     896 CBC          19 :                 nelems = ARR_DIMS(arr)[0];
     897              19 :                 elems = (int16 *) ARR_DATA_PTR(arr);
     898                 : 
     899              53 :                 for (natt = 0; natt < nelems; natt++)
     900              34 :                     included_cols = bms_add_member(included_cols, elems[natt]);
     901 ECB             :             }
     902                 : 
     903 GIC         155 :             ExecClearTuple(tslot);
     904                 :         }
     905             155 :         ExecDropSingleTupleTableSlot(tslot);
     906                 : 
     907             155 :         walrcv_clear_result(pubres);
     908 ECB             : 
     909 CBC         155 :         pfree(pub_names.data);
     910                 :     }
     911                 : 
     912                 :     /*
     913                 :      * Now fetch column names and types.
     914                 :      */
     915 GIC         155 :     resetStringInfo(&cmd);
     916             155 :     appendStringInfo(&cmd,
     917                 :                      "SELECT a.attnum,"
     918                 :                      "       a.attname,"
     919                 :                      "       a.atttypid,"
     920                 :                      "       a.attnum = ANY(i.indkey)"
     921 ECB             :                      "  FROM pg_catalog.pg_attribute a"
     922                 :                      "  LEFT JOIN pg_catalog.pg_index i"
     923                 :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     924                 :                      " WHERE a.attnum > 0::pg_catalog.int2"
     925 EUB             :                      "   AND NOT a.attisdropped %s"
     926                 :                      "   AND a.attrelid = %u"
     927                 :                      " ORDER BY a.attnum",
     928                 :                      lrel->remoteid,
     929 GIC         155 :                      (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
     930                 :                       "AND a.attgenerated = ''" : ""),
     931                 :                      lrel->remoteid);
     932             155 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
     933                 :                       lengthof(attrRow), attrRow);
     934                 : 
     935             155 :     if (res->status != WALRCV_OK_TUPLES)
     936 UIC           0 :         ereport(ERROR,
     937                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     938 ECB             :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     939 EUB             :                         nspname, relname, res->err)));
     940                 : 
     941                 :     /* We don't know the number of rows coming, so allocate enough space. */
     942 GIC         155 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     943             155 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     944             155 :     lrel->attkeys = NULL;
     945                 : 
     946                 :     /*
     947                 :      * Store the columns as a list of names.  Ignore those that are not
     948                 :      * present in the column list, if there is one.
     949                 :      */
     950 CBC         155 :     natt = 0;
     951             155 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     952 GIC         432 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     953 ECB             :     {
     954                 :         char       *rel_colname;
     955                 :         AttrNumber  attnum;
     956                 : 
     957 GIC         277 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
     958             277 :         Assert(!isnull);
     959                 : 
     960                 :         /* If the column is not in the column list, skip it. */
     961 CBC         277 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
     962 ECB             :         {
     963 CBC          22 :             ExecClearTuple(slot);
     964 GIC          22 :             continue;
     965 ECB             :         }
     966                 : 
     967 GIC         255 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
     968             255 :         Assert(!isnull);
     969 ECB             : 
     970 GIC         255 :         lrel->attnames[natt] = rel_colname;
     971 CBC         255 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
     972 GIC         255 :         Assert(!isnull);
     973 ECB             : 
     974 GIC         255 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     975 CBC          94 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     976                 : 
     977                 :         /* Should never happen. */
     978 GIC         255 :         if (++natt >= MaxTupleAttributeNumber)
     979 UIC           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     980                 :                  nspname, relname);
     981 ECB             : 
     982 CBC         255 :         ExecClearTuple(slot);
     983                 :     }
     984 GIC         155 :     ExecDropSingleTupleTableSlot(slot);
     985                 : 
     986             155 :     lrel->natts = natt;
     987                 : 
     988             155 :     walrcv_clear_result(res);
     989                 : 
     990                 :     /*
     991                 :      * Get relation's row filter expressions. DISTINCT avoids the same
     992                 :      * expression of a table in multiple publications from being included
     993                 :      * multiple times in the final expression.
     994                 :      *
     995 ECB             :      * We need to copy the row even if it matches just one of the
     996                 :      * publications, so we later combine all the quals with OR.
     997                 :      *
     998                 :      * For initial synchronization, row filtering can be ignored in following
     999                 :      * cases:
    1000                 :      *
    1001                 :      * 1) one of the subscribed publications for the table hasn't specified
    1002 EUB             :      * any row filter
    1003                 :      *
    1004                 :      * 2) one of the subscribed publications has puballtables set to true
    1005                 :      *
    1006                 :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
    1007                 :      * that includes this relation
    1008 ECB             :      */
    1009 CBC         155 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
    1010 ECB             :     {
    1011                 :         StringInfoData pub_names;
    1012                 : 
    1013                 :         /* Build the pubname list. */
    1014 GIC         155 :         initStringInfo(&pub_names);
    1015             495 :         foreach(lc, MySubscription->publications)
    1016 ECB             :         {
    1017 CBC         340 :             char       *pubname = strVal(lfirst(lc));
    1018 ECB             : 
    1019 GIC         340 :             if (foreach_current_index(lc) > 0)
    1020             185 :                 appendStringInfoString(&pub_names, ", ");
    1021                 : 
    1022             340 :             appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
    1023 ECB             :         }
    1024                 : 
    1025                 :         /* Check for row filters. */
    1026 GIC         155 :         resetStringInfo(&cmd);
    1027 CBC         155 :         appendStringInfo(&cmd,
    1028                 :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
    1029 ECB             :                          "  FROM pg_publication p,"
    1030                 :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
    1031                 :                          " WHERE gpt.relid = %u"
    1032                 :                          "   AND p.pubname IN ( %s )",
    1033                 :                          lrel->remoteid,
    1034                 :                          pub_names.data);
    1035                 : 
    1036 CBC         155 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
    1037 ECB             : 
    1038 CBC         155 :         if (res->status != WALRCV_OK_TUPLES)
    1039 UIC           0 :             ereport(ERROR,
    1040 ECB             :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
    1041                 :                             nspname, relname, res->err)));
    1042                 : 
    1043                 :         /*
    1044                 :          * Multiple row filter expressions for the same table will be combined
    1045 EUB             :          * by COPY using OR. If any of the filter expressions for this table
    1046                 :          * are null, it means the whole table will be copied. In this case it
    1047                 :          * is not necessary to construct a unified row filter expression at
    1048 ECB             :          * all.
    1049                 :          */
    1050 CBC         155 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1051 GIC         169 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1052 ECB             :         {
    1053 GIC         159 :             Datum       rf = slot_getattr(slot, 1, &isnull);
    1054 ECB             : 
    1055 GIC         159 :             if (!isnull)
    1056              14 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
    1057                 :             else
    1058                 :             {
    1059                 :                 /* Ignore filters and cleanup as necessary. */
    1060             145 :                 if (*qual)
    1061                 :                 {
    1062               3 :                     list_free_deep(*qual);
    1063               3 :                     *qual = NIL;
    1064                 :                 }
    1065             145 :                 break;
    1066                 :             }
    1067                 : 
    1068              14 :             ExecClearTuple(slot);
    1069                 :         }
    1070             155 :         ExecDropSingleTupleTableSlot(slot);
    1071                 : 
    1072             155 :         walrcv_clear_result(res);
    1073                 :     }
    1074                 : 
    1075 CBC         155 :     pfree(cmd.data);
    1076 GIC         155 : }
    1077                 : 
    1078                 : /*
    1079                 :  * Copy existing data of a table from publisher.
    1080 ECB             :  *
    1081                 :  * Caller is responsible for locking the local relation.
    1082                 :  */
    1083                 : static void
    1084 GIC         155 : copy_table(Relation rel)
    1085 ECB             : {
    1086                 :     LogicalRepRelMapEntry *relmapentry;
    1087                 :     LogicalRepRelation lrel;
    1088 CBC         155 :     List       *qual = NIL;
    1089                 :     WalRcvExecResult *res;
    1090                 :     StringInfoData cmd;
    1091                 :     CopyFromState cstate;
    1092 ECB             :     List       *attnamelist;
    1093                 :     ParseState *pstate;
    1094 GNC         155 :     List       *options = NIL;
    1095                 : 
    1096                 :     /* Get the publisher relation info. */
    1097 GIC         155 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
    1098             155 :                             RelationGetRelationName(rel), &lrel, &qual);
    1099                 : 
    1100                 :     /* Put the relation into relmap. */
    1101             155 :     logicalrep_relmap_update(&lrel);
    1102                 : 
    1103 ECB             :     /* Map the publisher relation to local one. */
    1104 GIC         155 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
    1105 CBC         155 :     Assert(rel == relmapentry->localrel);
    1106 EUB             : 
    1107                 :     /* Start copy on the publisher. */
    1108 GIC         155 :     initStringInfo(&cmd);
    1109                 : 
    1110                 :     /* Regular table with no row filter */
    1111             155 :     if (lrel.relkind == RELKIND_RELATION && qual == NIL)
    1112                 :     {
    1113             132 :         appendStringInfo(&cmd, "COPY %s (",
    1114             132 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
    1115                 : 
    1116                 :         /*
    1117 ECB             :          * XXX Do we need to list the columns in all cases? Maybe we're
    1118                 :          * replicating all columns?
    1119                 :          */
    1120 CBC         352 :         for (int i = 0; i < lrel.natts; i++)
    1121                 :         {
    1122             220 :             if (i > 0)
    1123              88 :                 appendStringInfoString(&cmd, ", ");
    1124                 : 
    1125 GIC         220 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1126                 :         }
    1127 ECB             : 
    1128 GNC         132 :         appendStringInfoString(&cmd, ") TO STDOUT");
    1129 ECB             :     }
    1130                 :     else
    1131                 :     {
    1132                 :         /*
    1133                 :          * For non-tables and tables with row filters, we need to do COPY
    1134                 :          * (SELECT ...), but we can't just do SELECT * because we need to not
    1135                 :          * copy generated columns. For tables with any row filters, build a
    1136                 :          * SELECT query with OR'ed row filters for COPY.
    1137                 :          */
    1138 GIC          23 :         appendStringInfoString(&cmd, "COPY (SELECT ");
    1139 CBC          58 :         for (int i = 0; i < lrel.natts; i++)
    1140                 :         {
    1141 GIC          35 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
    1142 CBC          35 :             if (i < lrel.natts - 1)
    1143              12 :                 appendStringInfoString(&cmd, ", ");
    1144                 :         }
    1145                 : 
    1146 GIC          23 :         appendStringInfoString(&cmd, " FROM ");
    1147                 : 
    1148                 :         /*
    1149                 :          * For regular tables, make sure we don't copy data from a child that
    1150                 :          * inherits the named table as those will be copied separately.
    1151 ECB             :          */
    1152 GIC          23 :         if (lrel.relkind == RELKIND_RELATION)
    1153               7 :             appendStringInfoString(&cmd, "ONLY ");
    1154                 : 
    1155 CBC          23 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
    1156                 :         /* list of OR'ed filters */
    1157 GIC          23 :         if (qual != NIL)
    1158                 :         {
    1159                 :             ListCell   *lc;
    1160              10 :             char       *q = strVal(linitial(qual));
    1161 ECB             : 
    1162 GIC          10 :             appendStringInfo(&cmd, " WHERE %s", q);
    1163              11 :             for_each_from(lc, qual, 1)
    1164 ECB             :             {
    1165 CBC           1 :                 q = strVal(lfirst(lc));
    1166 GIC           1 :                 appendStringInfo(&cmd, " OR %s", q);
    1167                 :             }
    1168 CBC          10 :             list_free_deep(qual);
    1169                 :         }
    1170                 : 
    1171              23 :         appendStringInfoString(&cmd, ") TO STDOUT");
    1172 ECB             :     }
    1173                 : 
    1174                 :     /*
    1175                 :      * Prior to v16, initial table synchronization will use text format even
    1176                 :      * if the binary option is enabled for a subscription.
    1177                 :      */
    1178 GNC         155 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
    1179             155 :         MySubscription->binary)
    1180                 :     {
    1181               5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
    1182               5 :         options = list_make1(makeDefElem("format",
    1183                 :                                          (Node *) makeString("binary"), -1));
    1184                 :     }
    1185                 : 
    1186 GIC         155 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    1187             155 :     pfree(cmd.data);
    1188 CBC         155 :     if (res->status != WALRCV_OK_COPY_OUT)
    1189 UIC           0 :         ereport(ERROR,
    1190                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1191 ECB             :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
    1192                 :                         lrel.nspname, lrel.relname, res->err)));
    1193 CBC         155 :     walrcv_clear_result(res);
    1194 ECB             : 
    1195 GIC         155 :     copybuf = makeStringInfo();
    1196                 : 
    1197             155 :     pstate = make_parsestate(NULL);
    1198             155 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
    1199                 :                                          NULL, false, false);
    1200 ECB             : 
    1201 GIC         155 :     attnamelist = make_copy_attnamelist(relmapentry);
    1202 GNC         155 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
    1203 ECB             : 
    1204                 :     /* Do the copy */
    1205 CBC         154 :     (void) CopyFrom(cstate);
    1206                 : 
    1207 GIC         148 :     logicalrep_rel_close(relmapentry, NoLock);
    1208 CBC         148 : }
    1209                 : 
    1210                 : /*
    1211                 :  * Determine the tablesync slot name.
    1212                 :  *
    1213                 :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
    1214                 :  * on slot name length. We append system_identifier to avoid slot_name
    1215                 :  * collision with subscriptions in other clusters. With the current scheme
    1216                 :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
    1217                 :  * length of slot_name will be 50.
    1218 ECB             :  *
    1219                 :  * The returned slot name is stored in the supplied buffer (syncslotname) with
    1220                 :  * the given size.
    1221                 :  *
    1222                 :  * Note: We don't use the subscription slot name as part of tablesync slot name
    1223                 :  * because we are responsible for cleaning up these slots and it could become
    1224                 :  * impossible to recalculate what name to cleanup if the subscription slot name
    1225                 :  * had changed.
    1226                 :  */
    1227                 : void
    1228 GIC         305 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
    1229                 :                                 char *syncslotname, Size szslot)
    1230                 : {
    1231             305 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
    1232 ECB             :              relid, GetSystemIdentifier());
    1233 CBC         305 : }
    1234                 : 
    1235                 : /*
    1236 ECB             :  * Start syncing the table in the sync worker.
    1237                 :  *
    1238                 :  * If nothing needs to be done to sync the table, we exit the worker without
    1239                 :  * any further action.
    1240                 :  *
    1241                 :  * The returned slot name is palloc'ed in current memory context.
    1242                 :  */
    1243                 : char *
    1244 GIC         156 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    1245                 : {
    1246 ECB             :     char       *slotname;
    1247                 :     char       *err;
    1248                 :     char        relstate;
    1249                 :     XLogRecPtr  relstate_lsn;
    1250                 :     Relation    rel;
    1251                 :     AclResult   aclresult;
    1252                 :     WalRcvExecResult *res;
    1253                 :     char        originname[NAMEDATALEN];
    1254                 :     RepOriginId originid;
    1255                 :     bool        must_use_password;
    1256                 : 
    1257                 :     /* Check the state of the table synchronization. */
    1258 GBC         156 :     StartTransactionCommand();
    1259 GIC         156 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
    1260             156 :                                        MyLogicalRepWorker->relid,
    1261                 :                                        &relstate_lsn);
    1262 CBC         156 :     CommitTransactionCommand();
    1263                 : 
    1264             156 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1265 GIC         156 :     MyLogicalRepWorker->relstate = relstate;
    1266 CBC         156 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    1267             156 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1268                 : 
    1269                 :     /*
    1270 ECB             :      * If synchronization is already done or no longer necessary, exit now
    1271                 :      * that we've updated shared memory state.
    1272                 :      */
    1273 GIC         156 :     switch (relstate)
    1274 ECB             :     {
    1275 UIC           0 :         case SUBREL_STATE_SYNCDONE:
    1276 ECB             :         case SUBREL_STATE_READY:
    1277                 :         case SUBREL_STATE_UNKNOWN:
    1278 UIC           0 :             finish_sync_worker();   /* doesn't return */
    1279                 :     }
    1280                 : 
    1281                 :     /* Calculate the name of the tablesync slot. */
    1282 GIC         156 :     slotname = (char *) palloc(NAMEDATALEN);
    1283             156 :     ReplicationSlotNameForTablesync(MySubscription->oid,
    1284             156 :                                     MyLogicalRepWorker->relid,
    1285                 :                                     slotname,
    1286                 :                                     NAMEDATALEN);
    1287                 : 
    1288                 :     /* Is the use of a password mandatory? */
    1289 GNC         309 :     must_use_password = MySubscription->passwordrequired &&
    1290             153 :         !superuser_arg(MySubscription->owner);
    1291                 : 
    1292                 :     /*
    1293                 :      * Here we use the slot name instead of the subscription name as the
    1294                 :      * application_name, so that it is different from the leader apply worker,
    1295                 :      * so that synchronous replication can distinguish them.
    1296                 :      */
    1297 GIC         156 :     LogRepWorkerWalRcvConn =
    1298 GNC         156 :         walrcv_connect(MySubscription->conninfo, true,
    1299                 :                        must_use_password,
    1300                 :                        slotname, &err);
    1301 GIC         156 :     if (LogRepWorkerWalRcvConn == NULL)
    1302 UIC           0 :         ereport(ERROR,
    1303 ECB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1304                 :                  errmsg("could not connect to the publisher: %s", err)));
    1305                 : 
    1306 CBC         156 :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
    1307                 :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
    1308 ECB             :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
    1309                 : 
    1310                 :     /* Assign the origin tracking record name. */
    1311 GNC         156 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
    1312             156 :                                        MyLogicalRepWorker->relid,
    1313                 :                                        originname,
    1314                 :                                        sizeof(originname));
    1315                 : 
    1316 GIC         156 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
    1317                 :     {
    1318                 :         /*
    1319 ECB             :          * We have previously errored out before finishing the copy so the
    1320                 :          * replication slot might exist. We want to remove the slot if it
    1321                 :          * already exists and proceed.
    1322                 :          *
    1323                 :          * XXX We could also instead try to drop the slot, last time we failed
    1324                 :          * but for that, we might need to clean up the copy state as it might
    1325                 :          * be in the middle of fetching the rows. Also, if there is a network
    1326                 :          * breakdown then it wouldn't have succeeded so trying it next time
    1327                 :          * seems like a better bet.
    1328                 :          */
    1329 GIC           6 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
    1330                 :     }
    1331             150 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
    1332                 :     {
    1333 ECB             :         /*
    1334                 :          * The COPY phase was previously done, but tablesync then crashed
    1335                 :          * before it was able to finish normally.
    1336                 :          */
    1337 CBC           1 :         StartTransactionCommand();
    1338                 : 
    1339 ECB             :         /*
    1340                 :          * The origin tracking name must already exist. It was created first
    1341                 :          * time this tablesync was launched.
    1342                 :          */
    1343 GIC           1 :         originid = replorigin_by_name(originname, false);
    1344 GNC           1 :         replorigin_session_setup(originid, 0);
    1345 GIC           1 :         replorigin_session_origin = originid;
    1346               1 :         *origin_startpos = replorigin_session_get_progress(false);
    1347                 : 
    1348 CBC           1 :         CommitTransactionCommand();
    1349                 : 
    1350 GBC           1 :         goto copy_table_done;
    1351                 :     }
    1352                 : 
    1353             155 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1354 GIC         155 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
    1355             155 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    1356             155 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1357 ECB             : 
    1358                 :     /* Update the state and make it visible to others. */
    1359 CBC         155 :     StartTransactionCommand();
    1360 GIC         155 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1361             155 :                                MyLogicalRepWorker->relid,
    1362             155 :                                MyLogicalRepWorker->relstate,
    1363             155 :                                MyLogicalRepWorker->relstate_lsn);
    1364 CBC         155 :     CommitTransactionCommand();
    1365             155 :     pgstat_report_stat(true);
    1366                 : 
    1367 GIC         155 :     StartTransactionCommand();
    1368                 : 
    1369                 :     /*
    1370                 :      * Use a standard write lock here. It might be better to disallow access
    1371                 :      * to the table while it's being synchronized. But we don't want to block
    1372 ECB             :      * the main apply process from working and it has to open the relation in
    1373                 :      * RowExclusiveLock when remapping remote relation id to local one.
    1374                 :      */
    1375 GIC         155 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
    1376 ECB             : 
    1377 EUB             :     /*
    1378                 :      * Check that our table sync worker has permission to insert into the
    1379                 :      * target table.
    1380                 :      */
    1381 CBC         155 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    1382                 :                                   ACL_INSERT);
    1383 GIC         155 :     if (aclresult != ACLCHECK_OK)
    1384 UIC           0 :         aclcheck_error(aclresult,
    1385               0 :                        get_relkind_objtype(rel->rd_rel->relkind),
    1386 LBC           0 :                        RelationGetRelationName(rel));
    1387 ECB             : 
    1388                 :     /*
    1389                 :      * COPY FROM does not honor RLS policies.  That is not a problem for
    1390                 :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
    1391                 :      * who has it implicitly), but other roles should not be able to
    1392                 :      * circumvent RLS.  Disallow logical replication into RLS enabled
    1393                 :      * relations for such roles.
    1394                 :      */
    1395 GIC         155 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
    1396 UIC           0 :         ereport(ERROR,
    1397                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1398                 :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    1399                 :                         GetUserNameFromId(GetUserId(), true),
    1400                 :                         RelationGetRelationName(rel))));
    1401                 : 
    1402                 :     /*
    1403                 :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
    1404 ECB             :      * ensures that both the replication slot we create (see below) and the
    1405                 :      * COPY are consistent with each other.
    1406                 :      */
    1407 GIC         155 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
    1408                 :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
    1409                 :                       0, NULL);
    1410             155 :     if (res->status != WALRCV_OK_COMMAND)
    1411 UIC           0 :         ereport(ERROR,
    1412 ECB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1413                 :                  errmsg("table copy could not start transaction on publisher: %s",
    1414                 :                         res->err)));
    1415 GIC         155 :     walrcv_clear_result(res);
    1416                 : 
    1417                 :     /*
    1418 ECB             :      * Create a new permanent logical decoding slot. This slot will be used
    1419                 :      * for the catchup phase after COPY is done, so tell it to use the
    1420                 :      * snapshot to make the final data consistent.
    1421                 :      */
    1422 GIC         155 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
    1423 ECB             :                        slotname, false /* permanent */ , false /* two_phase */ ,
    1424                 :                        CRS_USE_SNAPSHOT, origin_startpos);
    1425                 : 
    1426                 :     /*
    1427                 :      * Setup replication origin tracking. The purpose of doing this before the
    1428                 :      * copy is to avoid doing the copy again due to any error in setting up
    1429                 :      * origin tracking.
    1430                 :      */
    1431 CBC         155 :     originid = replorigin_by_name(originname, true);
    1432 GIC         155 :     if (!OidIsValid(originid))
    1433                 :     {
    1434 ECB             :         /*
    1435                 :          * Origin tracking does not exist, so create it now.
    1436                 :          *
    1437                 :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
    1438                 :          * logged for the purpose of recovery. Locks are to prevent the
    1439                 :          * replication origin from vanishing while advancing.
    1440                 :          */
    1441 GIC         155 :         originid = replorigin_create(originname);
    1442 ECB             : 
    1443 GIC         155 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1444             155 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
    1445                 :                            true /* go backward */ , true /* WAL log */ );
    1446             155 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1447                 : 
    1448 GNC         155 :         replorigin_session_setup(originid, 0);
    1449 GIC         155 :         replorigin_session_origin = originid;
    1450 ECB             :     }
    1451                 :     else
    1452                 :     {
    1453 UIC           0 :         ereport(ERROR,
    1454                 :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1455                 :                  errmsg("replication origin \"%s\" already exists",
    1456 ECB             :                         originname)));
    1457                 :     }
    1458                 : 
    1459 EUB             :     /* Now do the initial data copy */
    1460 GBC         155 :     PushActiveSnapshot(GetTransactionSnapshot());
    1461             155 :     copy_table(rel);
    1462 GIC         148 :     PopActiveSnapshot();
    1463                 : 
    1464             148 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
    1465             148 :     if (res->status != WALRCV_OK_COMMAND)
    1466 UIC           0 :         ereport(ERROR,
    1467                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1468                 :                  errmsg("table copy could not finish transaction on publisher: %s",
    1469                 :                         res->err)));
    1470 CBC         148 :     walrcv_clear_result(res);
    1471 EUB             : 
    1472 GIC         148 :     table_close(rel, NoLock);
    1473                 : 
    1474                 :     /* Make the copy visible. */
    1475             148 :     CommandCounterIncrement();
    1476                 : 
    1477                 :     /*
    1478                 :      * Update the persisted state to indicate the COPY phase is done; make it
    1479                 :      * visible to others.
    1480                 :      */
    1481             148 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
    1482 CBC         148 :                                MyLogicalRepWorker->relid,
    1483                 :                                SUBREL_STATE_FINISHEDCOPY,
    1484 GIC         148 :                                MyLogicalRepWorker->relstate_lsn);
    1485 ECB             : 
    1486 GBC         148 :     CommitTransactionCommand();
    1487                 : 
    1488 GIC         149 : copy_table_done:
    1489                 : 
    1490 CBC         149 :     elog(DEBUG1,
    1491                 :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
    1492                 :          originname, LSN_FORMAT_ARGS(*origin_startpos));
    1493                 : 
    1494                 :     /*
    1495                 :      * We are done with the initial data synchronization, update the state.
    1496                 :      */
    1497             149 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    1498 GIC         149 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
    1499             149 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
    1500             149 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    1501                 : 
    1502                 :     /*
    1503                 :      * Finally, wait until the leader apply worker tells us to catch up and
    1504                 :      * then return to let LogicalRepApplyLoop do it.
    1505                 :      */
    1506 CBC         149 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    1507             149 :     return slotname;
    1508                 : }
    1509                 : 
    1510                 : /*
    1511                 :  * Common code to fetch the up-to-date sync state info into the static lists.
    1512                 :  *
    1513                 :  * Returns true if subscription has 1 or more tables, else false.
    1514                 :  *
    1515                 :  * Note: If this function started the transaction (indicated by the parameter)
    1516 ECB             :  * then it is the caller's responsibility to commit it.
    1517                 :  */
    1518                 : static bool
    1519 CBC        2534 : FetchTableStates(bool *started_tx)
    1520                 : {
    1521 ECB             :     static bool has_subrels = false;
    1522                 : 
    1523 CBC        2534 :     *started_tx = false;
    1524 ECB             : 
    1525 GIC        2534 :     if (!table_states_valid)
    1526                 :     {
    1527                 :         MemoryContext oldctx;
    1528 EUB             :         List       *rstates;
    1529                 :         ListCell   *lc;
    1530                 :         SubscriptionRelState *rstate;
    1531                 : 
    1532                 :         /* Clean the old lists. */
    1533 GIC         664 :         list_free_deep(table_states_not_ready);
    1534             664 :         table_states_not_ready = NIL;
    1535 ECB             : 
    1536 CBC         664 :         if (!IsTransactionState())
    1537 ECB             :         {
    1538 GIC         651 :             StartTransactionCommand();
    1539 CBC         651 :             *started_tx = true;
    1540 ECB             :         }
    1541 EUB             : 
    1542                 :         /* Fetch all non-ready tables. */
    1543 GNC         664 :         rstates = GetSubscriptionRelations(MySubscription->oid, true);
    1544                 : 
    1545 ECB             :         /* Allocate the tracking info in a permanent memory context. */
    1546 GIC         664 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1547 CBC        1855 :         foreach(lc, rstates)
    1548                 :         {
    1549 GIC        1191 :             rstate = palloc(sizeof(SubscriptionRelState));
    1550 CBC        1191 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
    1551 GIC        1191 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
    1552                 :         }
    1553             664 :         MemoryContextSwitchTo(oldctx);
    1554                 : 
    1555                 :         /*
    1556 ECB             :          * Does the subscription have tables?
    1557                 :          *
    1558                 :          * If there were not-READY relations found then we know it does. But
    1559                 :          * if table_state_not_ready was empty we still need to check again to
    1560                 :          * see if there are 0 tables.
    1561                 :          */
    1562 GNC         831 :         has_subrels = (table_states_not_ready != NIL) ||
    1563 CBC         167 :             HasSubscriptionRelations(MySubscription->oid);
    1564                 : 
    1565             664 :         table_states_valid = true;
    1566                 :     }
    1567                 : 
    1568 GIC        2534 :     return has_subrels;
    1569                 : }
    1570                 : 
    1571                 : /*
    1572 ECB             :  * If the subscription has no tables then return false.
    1573                 :  *
    1574                 :  * Otherwise, are all tablesyncs READY?
    1575                 :  *
    1576                 :  * Note: This function is not suitable to be called from outside of apply or
    1577                 :  * tablesync workers because MySubscription needs to be already initialized.
    1578                 :  */
    1579                 : bool
    1580 GIC          58 : AllTablesyncsReady(void)
    1581 ECB             : {
    1582 CBC          58 :     bool        started_tx = false;
    1583 GIC          58 :     bool        has_subrels = false;
    1584                 : 
    1585                 :     /* We need up-to-date sync state info for subscription tables here. */
    1586              58 :     has_subrels = FetchTableStates(&started_tx);
    1587                 : 
    1588              58 :     if (started_tx)
    1589                 :     {
    1590               9 :         CommitTransactionCommand();
    1591               9 :         pgstat_report_stat(true);
    1592                 :     }
    1593                 : 
    1594 ECB             :     /*
    1595                 :      * Return false when there are no tables in subscription or not all tables
    1596                 :      * are in ready state; true otherwise.
    1597                 :      */
    1598 GNC          58 :     return has_subrels && (table_states_not_ready == NIL);
    1599                 : }
    1600 ECB             : 
    1601                 : /*
    1602                 :  * Update the two_phase state of the specified subscription in pg_subscription.
    1603                 :  */
    1604                 : void
    1605 GIC           4 : UpdateTwoPhaseState(Oid suboid, char new_state)
    1606                 : {
    1607                 :     Relation    rel;
    1608 ECB             :     HeapTuple   tup;
    1609                 :     bool        nulls[Natts_pg_subscription];
    1610                 :     bool        replaces[Natts_pg_subscription];
    1611                 :     Datum       values[Natts_pg_subscription];
    1612                 : 
    1613 CBC           4 :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
    1614 ECB             :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
    1615                 :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
    1616                 : 
    1617 GIC           4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1618 CBC           4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
    1619 GIC           4 :     if (!HeapTupleIsValid(tup))
    1620 UIC           0 :         elog(ERROR,
    1621 ECB             :              "cache lookup failed for subscription oid %u",
    1622                 :              suboid);
    1623                 : 
    1624                 :     /* Form a new tuple. */
    1625 CBC           4 :     memset(values, 0, sizeof(values));
    1626               4 :     memset(nulls, false, sizeof(nulls));
    1627 GIC           4 :     memset(replaces, false, sizeof(replaces));
    1628 ECB             : 
    1629                 :     /* And update/set two_phase state */
    1630 GIC           4 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
    1631               4 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
    1632                 : 
    1633               4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
    1634                 :                             values, nulls, replaces);
    1635               4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1636                 : 
    1637 CBC           4 :     heap_freetuple(tup);
    1638               4 :     table_close(rel, RowExclusiveLock);
    1639 GIC           4 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a