LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit UNC LBC UBC GIC GNC CBC DCB
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 91.2 % 536 489 4 43 1 47 441 13
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 19 19 8 11
Baseline: 16@8cea358b128 Branches: 65.9 % 317 209 8 1 99 17 192
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed [..60] days: 100.0 % 1 1 1
(60,120] days: 100.0 % 4 4 4
(120,180] days: 100.0 % 15 15 2 13
(180,240] days: 81.8 % 11 9 2 9
(240..) days: 91.1 % 505 460 2 43 1 31 428
Function coverage date bins:
(240..) days: 100.0 % 19 19 8 11
Branch coverage date bins:
(60,120] days: 87.5 % 8 7 1 7
(120,180] days: 80.0 % 10 8 1 1 1 7
(180,240] days: 42.9 % 7 3 4 3
(240..) days: 65.4 % 292 191 2 1 98 6 185

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * tablesync.c
                                  3                 :                :  *    PostgreSQL logical replication: initial table data synchronization
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/tablesync.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This file contains code for initial table data synchronization for
                                 12                 :                :  *    logical replication.
                                 13                 :                :  *
                                 14                 :                :  *    The initial data synchronization is done separately for each table,
                                 15                 :                :  *    in a separate apply worker that only fetches the initial snapshot data
                                 16                 :                :  *    from the publisher and then synchronizes the position in the stream with
                                 17                 :                :  *    the leader apply worker.
                                 18                 :                :  *
                                 19                 :                :  *    There are several reasons for doing the synchronization this way:
                                 20                 :                :  *     - It allows us to parallelize the initial data synchronization
                                 21                 :                :  *       which lowers the time needed for it to happen.
                                 22                 :                :  *     - The initial synchronization does not have to hold the xid and LSN
                                 23                 :                :  *       for the time it takes to copy data of all tables, causing less
                                 24                 :                :  *       bloat and lower disk consumption compared to doing the
                                 25                 :                :  *       synchronization in a single process for the whole database.
                                 26                 :                :  *     - It allows us to synchronize any tables added after the initial
                                 27                 :                :  *       synchronization has finished.
                                 28                 :                :  *
                                 29                 :                :  *    The stream position synchronization works in multiple steps:
                                 30                 :                :  *     - Apply worker requests a tablesync worker to start, setting the new
                                 31                 :                :  *       table state to INIT.
                                 32                 :                :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
                                 33                 :                :  *       copying.
                                 34                 :                :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
                                 35                 :                :  *       worker specific) state to indicate when the copy phase has completed, so
                                 36                 :                :  *       if the worker crashes with this (non-memory) state then the copy will not
                                 37                 :                :  *       be re-attempted.
                                 38                 :                :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
                                 39                 :                :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
                                 40                 :                :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
                                 41                 :                :  *       until either the table state is set to SYNCDONE or the sync worker
                                 42                 :                :  *       exits.
                                 43                 :                :  *     - After the sync worker has seen the state change to CATCHUP, it will
                                 44                 :                :  *       read the stream and apply changes (acting like an apply worker) until
                                 45                 :                :  *       it catches up to the specified stream position.  Then it sets the
                                 46                 :                :  *       state to SYNCDONE.  There might be zero changes applied between
                                 47                 :                :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
                                 48                 :                :  *       apply worker.
                                 49                 :                :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
                                 50                 :                :  *       the table until it reaches the SYNCDONE stream position, at which
                                 51                 :                :  *       point it sets state to READY and stops tracking.  Again, there might
                                 52                 :                :  *       be zero changes in between.
                                 53                 :                :  *
                                 54                 :                :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
                                 55                 :                :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
                                 56                 :                :  *
                                 57                 :                :  *    The catalog pg_subscription_rel is used to keep information about
                                 58                 :                :  *    subscribed tables and their state.  The catalog holds all states
                                 59                 :                :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
                                 60                 :                :  *
                                 61                 :                :  *    Example flows look like this:
                                 62                 :                :  *     - Apply is in front:
                                 63                 :                :  *        sync:8
                                 64                 :                :  *          -> set in catalog FINISHEDCOPY
                                 65                 :                :  *          -> set in memory SYNCWAIT
                                 66                 :                :  *        apply:10
                                 67                 :                :  *          -> set in memory CATCHUP
                                 68                 :                :  *          -> enter wait-loop
                                 69                 :                :  *        sync:10
                                 70                 :                :  *          -> set in catalog SYNCDONE
                                 71                 :                :  *          -> exit
                                 72                 :                :  *        apply:10
                                 73                 :                :  *          -> exit wait-loop
                                 74                 :                :  *          -> continue rep
                                 75                 :                :  *        apply:11
                                 76                 :                :  *          -> set in catalog READY
                                 77                 :                :  *
                                 78                 :                :  *     - Sync is in front:
                                 79                 :                :  *        sync:10
                                 80                 :                :  *          -> set in catalog FINISHEDCOPY
                                 81                 :                :  *          -> set in memory SYNCWAIT
                                 82                 :                :  *        apply:8
                                 83                 :                :  *          -> set in memory CATCHUP
                                 84                 :                :  *          -> continue per-table filtering
                                 85                 :                :  *        sync:10
                                 86                 :                :  *          -> set in catalog SYNCDONE
                                 87                 :                :  *          -> exit
                                 88                 :                :  *        apply:10
                                 89                 :                :  *          -> set in catalog READY
                                 90                 :                :  *          -> stop per-table filtering
                                 91                 :                :  *          -> continue rep
                                 92                 :                :  *-------------------------------------------------------------------------
                                 93                 :                :  */
                                 94                 :                : 
                                 95                 :                : #include "postgres.h"
                                 96                 :                : 
                                 97                 :                : #include "access/table.h"
                                 98                 :                : #include "access/xact.h"
                                 99                 :                : #include "catalog/indexing.h"
                                100                 :                : #include "catalog/pg_subscription_rel.h"
                                101                 :                : #include "catalog/pg_type.h"
                                102                 :                : #include "commands/copy.h"
                                103                 :                : #include "miscadmin.h"
                                104                 :                : #include "nodes/makefuncs.h"
                                105                 :                : #include "parser/parse_relation.h"
                                106                 :                : #include "pgstat.h"
                                107                 :                : #include "replication/logicallauncher.h"
                                108                 :                : #include "replication/logicalrelation.h"
                                109                 :                : #include "replication/logicalworker.h"
                                110                 :                : #include "replication/origin.h"
                                111                 :                : #include "replication/slot.h"
                                112                 :                : #include "replication/walreceiver.h"
                                113                 :                : #include "replication/worker_internal.h"
                                114                 :                : #include "storage/ipc.h"
                                115                 :                : #include "storage/lmgr.h"
                                116                 :                : #include "utils/acl.h"
                                117                 :                : #include "utils/array.h"
                                118                 :                : #include "utils/builtins.h"
                                119                 :                : #include "utils/lsyscache.h"
                                120                 :                : #include "utils/memutils.h"
                                121                 :                : #include "utils/rls.h"
                                122                 :                : #include "utils/snapmgr.h"
                                123                 :                : #include "utils/syscache.h"
                                124                 :                : #include "utils/usercontext.h"
                                125                 :                : 
                                126                 :                : static bool table_states_valid = false;
                                127                 :                : static List *table_states_not_ready = NIL;
                                128                 :                : static bool FetchTableStates(bool *started_tx);
                                129                 :                : 
                                130                 :                : static StringInfo copybuf = NULL;
                                131                 :                : 
                                132                 :                : /*
                                133                 :                :  * Exit routine for synchronization worker.
                                134                 :                :  */
                                135                 :                : static void
                                136                 :                : pg_attribute_noreturn()
 2579 peter_e@gmx.net           137                 :CBC         160 : finish_sync_worker(void)
                                138                 :                : {
                                139                 :                :     /*
                                140                 :                :      * Commit any outstanding transaction. This is the usual case, unless
                                141                 :                :      * there was nothing to do for the table.
                                142                 :                :      */
                                143         [ +  - ]:            160 :     if (IsTransactionState())
                                144                 :                :     {
                                145                 :            160 :         CommitTransactionCommand();
  739 andres@anarazel.de        146                 :            160 :         pgstat_report_stat(true);
                                147                 :                :     }
                                148                 :                : 
                                149                 :                :     /* And flush all writes. */
 2579 peter_e@gmx.net           150                 :            160 :     XLogFlush(GetXLogWriteRecPtr());
                                151                 :                : 
 2517                           152                 :            160 :     StartTransactionCommand();
 2579                           153         [ +  - ]:            160 :     ereport(LOG,
                                154                 :                :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
                                155                 :                :                     MySubscription->name,
                                156                 :                :                     get_rel_name(MyLogicalRepWorker->relid))));
 2517                           157                 :            160 :     CommitTransactionCommand();
                                158                 :                : 
                                159                 :                :     /* Find the leader apply worker and signal it. */
 2504                           160                 :            160 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
                                161                 :                : 
                                162                 :                :     /* Stop gracefully */
 2579                           163                 :            160 :     proc_exit(0);
                                164                 :                : }
                                165                 :                : 
                                166                 :                : /*
                                167                 :                :  * Wait until the relation sync state is set in the catalog to the expected
                                168                 :                :  * one; return true when it happens.
                                169                 :                :  *
                                170                 :                :  * Returns false if the table sync worker or the table itself have
                                171                 :                :  * disappeared, or the table state has been reset.
                                172                 :                :  *
                                173                 :                :  * Currently, this is used in the apply worker when transitioning from
                                174                 :                :  * CATCHUP state to SYNCDONE.
                                175                 :                :  */
                                176                 :                : static bool
 2504                           177                 :            160 : wait_for_relation_state_change(Oid relid, char expected_state)
                                178                 :                : {
                                179                 :                :     char        state;
                                180                 :                : 
                                181                 :                :     for (;;)
 2579                           182                 :            179 :     {
                                183                 :                :         LogicalRepWorker *worker;
                                184                 :                :         XLogRecPtr  statelsn;
                                185                 :                : 
 2508                           186         [ -  + ]:            339 :         CHECK_FOR_INTERRUPTS();
                                187                 :                : 
 1277 alvherre@alvh.no-ip.      188                 :            339 :         InvalidateCatalogSnapshot();
 2504 peter_e@gmx.net           189                 :            339 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                190                 :                :                                         relid, &statelsn);
                                191                 :                : 
                                192         [ -  + ]:            339 :         if (state == SUBREL_STATE_UNKNOWN)
 1277 alvherre@alvh.no-ip.      193                 :UBC           0 :             break;
                                194                 :                : 
 2504 peter_e@gmx.net           195         [ +  + ]:CBC         339 :         if (state == expected_state)
                                196                 :            160 :             return true;
                                197                 :                : 
                                198                 :                :         /* Check if the sync worker is still running and bail if not. */
 2579                           199                 :            179 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 1277 alvherre@alvh.no-ip.      200                 :            179 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
                                201                 :                :                                         false);
 2504 peter_e@gmx.net           202                 :            179 :         LWLockRelease(LogicalRepWorkerLock);
 2579                           203         [ -  + ]:            179 :         if (!worker)
 1277 alvherre@alvh.no-ip.      204                 :UBC           0 :             break;
                                205                 :                : 
 1969 tmunro@postgresql.or      206                 :CBC         179 :         (void) WaitLatch(MyLatch,
                                207                 :                :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                208                 :                :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                209                 :                : 
 2504 andres@anarazel.de        210                 :            179 :         ResetLatch(MyLatch);
                                211                 :                :     }
                                212                 :                : 
 2504 peter_e@gmx.net           213                 :UBC           0 :     return false;
                                214                 :                : }
                                215                 :                : 
                                216                 :                : /*
                                217                 :                :  * Wait until the apply worker changes the state of our synchronization
                                218                 :                :  * worker to the expected one.
                                219                 :                :  *
                                220                 :                :  * Used when transitioning from SYNCWAIT state to CATCHUP.
                                221                 :                :  *
                                222                 :                :  * Returns false if the apply worker has disappeared.
                                223                 :                :  */
                                224                 :                : static bool
 2504 peter_e@gmx.net           225                 :CBC         160 : wait_for_worker_state_change(char expected_state)
                                226                 :                : {
                                227                 :                :     int         rc;
                                228                 :                : 
                                229                 :                :     for (;;)
                                230                 :            160 :     {
                                231                 :                :         LogicalRepWorker *worker;
                                232                 :                : 
                                233         [ -  + ]:            320 :         CHECK_FOR_INTERRUPTS();
                                234                 :                : 
                                235                 :                :         /*
                                236                 :                :          * Done if already in correct state.  (We assume this fetch is atomic
                                237                 :                :          * enough to not give a misleading answer if we do it with no lock.)
                                238                 :                :          */
 2480 tgl@sss.pgh.pa.us         239         [ +  + ]:            320 :         if (MyLogicalRepWorker->relstate == expected_state)
                                240                 :            160 :             return true;
                                241                 :                : 
                                242                 :                :         /*
                                243                 :                :          * Bail out if the apply worker has died, else signal it we're
                                244                 :                :          * waiting.
                                245                 :                :          */
 2504 peter_e@gmx.net           246                 :            160 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                247                 :            160 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                248                 :                :                                         InvalidOid, false);
 2480 tgl@sss.pgh.pa.us         249   [ +  -  +  - ]:            160 :         if (worker && worker->proc)
                                250                 :            160 :             logicalrep_worker_wakeup_ptr(worker);
 2504 peter_e@gmx.net           251                 :            160 :         LWLockRelease(LogicalRepWorkerLock);
                                252         [ -  + ]:            160 :         if (!worker)
 2480 tgl@sss.pgh.pa.us         253                 :UBC           0 :             break;
                                254                 :                : 
                                255                 :                :         /*
                                256                 :                :          * Wait.  We expect to get a latch signal back from the apply worker,
                                257                 :                :          * but use a timeout in case it dies without sending one.
                                258                 :                :          */
 2504 andres@anarazel.de        259                 :CBC         160 :         rc = WaitLatch(MyLatch,
                                260                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                261                 :                :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                262                 :                : 
 2480 tgl@sss.pgh.pa.us         263         [ +  - ]:            160 :         if (rc & WL_LATCH_SET)
                                264                 :            160 :             ResetLatch(MyLatch);
                                265                 :                :     }
                                266                 :                : 
 2579 peter_e@gmx.net           267                 :UBC           0 :     return false;
                                268                 :                : }
                                269                 :                : 
                                270                 :                : /*
                                271                 :                :  * Callback from syscache invalidation.
                                272                 :                :  */
                                273                 :                : void
 2579 peter_e@gmx.net           274                 :CBC        1548 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
                                275                 :                : {
                                276                 :           1548 :     table_states_valid = false;
                                277                 :           1548 : }
                                278                 :                : 
                                279                 :                : /*
                                280                 :                :  * Handle table synchronization cooperation from the synchronization
                                281                 :                :  * worker.
                                282                 :                :  *
                                283                 :                :  * If the sync worker is in CATCHUP state and reached (or passed) the
                                284                 :                :  * predetermined synchronization point in the WAL stream, mark the table as
                                285                 :                :  * SYNCDONE and finish.
                                286                 :                :  */
                                287                 :                : static void
                                288                 :            204 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                289                 :                : {
                                290         [ -  + ]:            204 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                                291                 :                : 
                                292         [ +  - ]:            204 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
                                293         [ +  + ]:            204 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
                                294                 :                :     {
                                295                 :                :         TimeLineID  tli;
 1157 akapila@postgresql.o      296                 :            160 :         char        syncslotname[NAMEDATALEN] = {0};
  593                           297                 :            160 :         char        originname[NAMEDATALEN] = {0};
                                298                 :                : 
 2504 peter_e@gmx.net           299                 :            160 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 2579                           300                 :            160 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
                                301                 :                : 
                                302                 :            160 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                303                 :                : 
                                304                 :                :         /*
                                305                 :                :          * UpdateSubscriptionRelState must be called within a transaction.
                                306                 :                :          */
 1157 akapila@postgresql.o      307         [ +  - ]:            160 :         if (!IsTransactionState())
                                308                 :            160 :             StartTransactionCommand();
                                309                 :                : 
 2200 peter_e@gmx.net           310                 :            160 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                311                 :            160 :                                    MyLogicalRepWorker->relid,
                                312                 :            160 :                                    MyLogicalRepWorker->relstate,
                                313                 :            160 :                                    MyLogicalRepWorker->relstate_lsn);
                                314                 :                : 
                                315                 :                :         /*
                                316                 :                :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
                                317                 :                :          * the slot.
                                318                 :                :          */
 1068 alvherre@alvh.no-ip.      319                 :            160 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
                                320                 :                : 
                                321                 :                :         /*
                                322                 :                :          * Cleanup the tablesync slot.
                                323                 :                :          *
                                324                 :                :          * This has to be done after updating the state because otherwise if
                                325                 :                :          * there is an error while doing the database operations we won't be
                                326                 :                :          * able to rollback dropped slot.
                                327                 :                :          */
 1157 akapila@postgresql.o      328                 :            160 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
                                329                 :            160 :                                         MyLogicalRepWorker->relid,
                                330                 :                :                                         syncslotname,
                                331                 :                :                                         sizeof(syncslotname));
                                332                 :                : 
                                333                 :                :         /*
                                334                 :                :          * It is important to give an error if we are unable to drop the slot,
                                335                 :                :          * otherwise, it won't be dropped till the corresponding subscription
                                336                 :                :          * is dropped. So passing missing_ok = false.
                                337                 :                :          */
 1068 alvherre@alvh.no-ip.      338                 :            160 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
                                339                 :                : 
  580 akapila@postgresql.o      340                 :            160 :         CommitTransactionCommand();
                                341                 :            160 :         pgstat_report_stat(false);
                                342                 :                : 
                                343                 :                :         /*
                                344                 :                :          * Start a new transaction to clean up the tablesync origin tracking.
                                345                 :                :          * This transaction will be ended within the finish_sync_worker().
                                346                 :                :          * Now, even, if we fail to remove this here, the apply worker will
                                347                 :                :          * ensure to clean it up afterward.
                                348                 :                :          *
                                349                 :                :          * We need to do this after the table state is set to SYNCDONE.
                                350                 :                :          * Otherwise, if an error occurs while performing the database
                                351                 :                :          * operation, the worker will be restarted and the in-memory state of
                                352                 :                :          * replication progress (remote_lsn) won't be rolled-back which would
                                353                 :                :          * have been cleared before restart. So, the restarted worker will use
                                354                 :                :          * invalid replication progress state resulting in replay of
                                355                 :                :          * transactions that have already been applied.
                                356                 :                :          */
                                357                 :            160 :         StartTransactionCommand();
                                358                 :                : 
  551                           359                 :            160 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                360                 :            160 :                                            MyLogicalRepWorker->relid,
                                361                 :                :                                            originname,
                                362                 :                :                                            sizeof(originname));
                                363                 :                : 
                                364                 :                :         /*
                                365                 :                :          * Resetting the origin session removes the ownership of the slot.
                                366                 :                :          * This is needed to allow the origin to be dropped.
                                367                 :                :          */
  580                           368                 :            160 :         replorigin_session_reset();
                                369                 :            160 :         replorigin_session_origin = InvalidRepOriginId;
                                370                 :            160 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
                                371                 :            160 :         replorigin_session_origin_timestamp = 0;
                                372                 :                : 
                                373                 :                :         /*
                                374                 :                :          * Drop the tablesync's origin tracking if exists.
                                375                 :                :          *
                                376                 :                :          * There is a chance that the user is concurrently performing refresh
                                377                 :                :          * for the subscription where we remove the table state and its origin
                                378                 :                :          * or the apply worker would have removed this origin. So passing
                                379                 :                :          * missing_ok = true.
                                380                 :                :          */
                                381                 :            160 :         replorigin_drop_by_name(originname, true, false);
                                382                 :                : 
 2579 peter_e@gmx.net           383                 :            160 :         finish_sync_worker();
                                384                 :                :     }
                                385                 :                :     else
                                386                 :             44 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                387                 :             44 : }
                                388                 :                : 
                                389                 :                : /*
                                390                 :                :  * Handle table synchronization cooperation from the apply worker.
                                391                 :                :  *
                                392                 :                :  * Walk over all subscription tables that are individually tracked by the
                                393                 :                :  * apply process (currently, all that have state other than
                                394                 :                :  * SUBREL_STATE_READY) and manage synchronization for them.
                                395                 :                :  *
                                396                 :                :  * If there are tables that need synchronizing and are not being synchronized
                                397                 :                :  * yet, start sync workers for them (if there are free slots for sync
                                398                 :                :  * workers).  To prevent starting the sync worker for the same relation at a
                                399                 :                :  * high frequency after a failure, we store its last start time with each sync
                                400                 :                :  * state info.  We start the sync worker for the same relation after waiting
                                401                 :                :  * at least wal_retrieve_retry_interval.
                                402                 :                :  *
                                403                 :                :  * For tables that are being synchronized already, check if sync workers
                                404                 :                :  * either need action from the apply worker or have finished.  This is the
                                405                 :                :  * SYNCWAIT to CATCHUP transition.
                                406                 :                :  *
                                407                 :                :  * If the synchronization position is reached (SYNCDONE), then the table can
                                408                 :                :  * be marked as READY and is no longer tracked.
                                409                 :                :  */
                                410                 :                : static void
                                411                 :           4388 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                412                 :                : {
                                413                 :                :     struct tablesync_start_time_mapping
                                414                 :                :     {
                                415                 :                :         Oid         relid;
                                416                 :                :         TimestampTz last_start_time;
                                417                 :                :     };
                                418                 :                :     static HTAB *last_start_times = NULL;
                                419                 :                :     ListCell   *lc;
 2533                           420                 :           4388 :     bool        started_tx = false;
  464 tgl@sss.pgh.pa.us         421                 :           4388 :     bool        should_exit = false;
                                422                 :                : 
 2579 peter_e@gmx.net           423         [ -  + ]:           4388 :     Assert(!IsTransactionState());
                                424                 :                : 
                                425                 :                :     /* We need up-to-date sync state info for subscription tables here. */
 1005 akapila@postgresql.o      426                 :           4388 :     FetchTableStates(&started_tx);
                                427                 :                : 
                                428                 :                :     /*
                                429                 :                :      * Prepare a hash table for tracking last start times of workers, to avoid
                                430                 :                :      * immediate restarts.  We don't need it if there are no tables that need
                                431                 :                :      * syncing.
                                432                 :                :      */
  606 tgl@sss.pgh.pa.us         433   [ +  +  +  + ]:           4388 :     if (table_states_not_ready != NIL && !last_start_times)
 2544 peter_e@gmx.net           434                 :            101 :     {
                                435                 :                :         HASHCTL     ctl;
                                436                 :                : 
                                437                 :            101 :         ctl.keysize = sizeof(Oid);
                                438                 :            101 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
                                439                 :            101 :         last_start_times = hash_create("Logical replication table sync worker start times",
                                440                 :                :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
                                441                 :                :     }
                                442                 :                : 
                                443                 :                :     /*
                                444                 :                :      * Clean up the hash table when we're done with all tables (just to
                                445                 :                :      * release the bit of memory).
                                446                 :                :      */
  606 tgl@sss.pgh.pa.us         447   [ +  +  +  + ]:           4287 :     else if (table_states_not_ready == NIL && last_start_times)
                                448                 :                :     {
 2544 peter_e@gmx.net           449                 :             75 :         hash_destroy(last_start_times);
                                450                 :             75 :         last_start_times = NULL;
                                451                 :                :     }
                                452                 :                : 
                                453                 :                :     /*
                                454                 :                :      * Process all tables that are being synchronized.
                                455                 :                :      */
 1005 akapila@postgresql.o      456   [ +  +  +  +  :           5780 :     foreach(lc, table_states_not_ready)
                                              +  + ]
                                457                 :                :     {
 2524 bruce@momjian.us          458                 :           1392 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
                                459                 :                : 
 2579 peter_e@gmx.net           460         [ +  + ]:           1392 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
                                461                 :                :         {
                                462                 :                :             /*
                                463                 :                :              * Apply has caught up to the position where the table sync has
                                464                 :                :              * finished.  Mark the table as ready so that the apply will just
                                465                 :                :              * continue to replicate it normally.
                                466                 :                :              */
                                467         [ +  - ]:            158 :             if (current_lsn >= rstate->lsn)
                                468                 :                :             {
                                469                 :                :                 char        originname[NAMEDATALEN];
                                470                 :                : 
                                471                 :            158 :                 rstate->state = SUBREL_STATE_READY;
                                472                 :            158 :                 rstate->lsn = current_lsn;
 2533                           473         [ +  + ]:            158 :                 if (!started_tx)
                                474                 :                :                 {
                                475                 :              7 :                     StartTransactionCommand();
                                476                 :              7 :                     started_tx = true;
                                477                 :                :                 }
                                478                 :                : 
                                479                 :                :                 /*
                                480                 :                :                  * Remove the tablesync origin tracking if exists.
                                481                 :                :                  *
                                482                 :                :                  * There is a chance that the user is concurrently performing
                                483                 :                :                  * refresh for the subscription where we remove the table
                                484                 :                :                  * state and its origin or the tablesync worker would have
                                485                 :                :                  * already removed this origin. We can't rely on tablesync
                                486                 :                :                  * worker to remove the origin tracking as if there is any
                                487                 :                :                  * error while dropping we won't restart it to drop the
                                488                 :                :                  * origin. So passing missing_ok = true.
                                489                 :                :                  */
  551 akapila@postgresql.o      490                 :            158 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                491                 :                :                                                    rstate->relid,
                                492                 :                :                                                    originname,
                                493                 :                :                                                    sizeof(originname));
  580                           494                 :            158 :                 replorigin_drop_by_name(originname, true, false);
                                495                 :                : 
                                496                 :                :                 /*
                                497                 :                :                  * Update the state to READY only after the origin cleanup.
                                498                 :                :                  */
 2200 peter_e@gmx.net           499                 :            158 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                500                 :            158 :                                            rstate->relid, rstate->state,
                                501                 :                :                                            rstate->lsn);
                                502                 :                :             }
                                503                 :                :         }
                                504                 :                :         else
                                505                 :                :         {
                                506                 :                :             LogicalRepWorker *syncworker;
                                507                 :                : 
                                508                 :                :             /*
                                509                 :                :              * Look for a sync worker for this relation.
                                510                 :                :              */
 2579                           511                 :           1234 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                512                 :                : 
                                513                 :           1234 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                514                 :                :                                                 rstate->relid, false);
                                515                 :                : 
                                516         [ +  + ]:           1234 :             if (syncworker)
                                517                 :                :             {
                                518                 :                :                 /* Found one, update our copy of its state */
                                519         [ -  + ]:            549 :                 SpinLockAcquire(&syncworker->relmutex);
                                520                 :            549 :                 rstate->state = syncworker->relstate;
                                521                 :            549 :                 rstate->lsn = syncworker->relstate_lsn;
 2480 tgl@sss.pgh.pa.us         522         [ +  + ]:            549 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                523                 :                :                 {
                                524                 :                :                     /*
                                525                 :                :                      * Sync worker is waiting for apply.  Tell sync worker it
                                526                 :                :                      * can catchup now.
                                527                 :                :                      */
                                528                 :            160 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
                                529                 :            160 :                     syncworker->relstate_lsn =
                                530                 :            160 :                         Max(syncworker->relstate_lsn, current_lsn);
                                531                 :                :                 }
 2579 peter_e@gmx.net           532                 :            549 :                 SpinLockRelease(&syncworker->relmutex);
                                533                 :                : 
                                534                 :                :                 /* If we told worker to catch up, wait for it. */
 2480 tgl@sss.pgh.pa.us         535         [ +  + ]:            549 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                536                 :                :                 {
                                537                 :                :                     /* Signal the sync worker, as it may be waiting for us. */
                                538         [ +  - ]:            160 :                     if (syncworker->proc)
                                539                 :            160 :                         logicalrep_worker_wakeup_ptr(syncworker);
                                540                 :                : 
                                541                 :                :                     /* Now safe to release the LWLock */
                                542                 :            160 :                     LWLockRelease(LogicalRepWorkerLock);
                                543                 :                : 
  125 akapila@postgresql.o      544         [ +  - ]:            160 :                     if (started_tx)
                                545                 :                :                     {
                                546                 :                :                         /*
                                547                 :                :                          * We must commit the existing transaction to release
                                548                 :                :                          * the existing locks before entering a busy loop.
                                549                 :                :                          * This is required to avoid any undetected deadlocks
                                550                 :                :                          * due to any existing lock as deadlock detector won't
                                551                 :                :                          * be able to detect the waits on the latch.
                                552                 :                :                          */
                                553                 :            160 :                         CommitTransactionCommand();
                                554                 :            160 :                         pgstat_report_stat(false);
                                555                 :                :                     }
                                556                 :                : 
                                557                 :                :                     /*
                                558                 :                :                      * Enter busy loop and wait for synchronization worker to
                                559                 :                :                      * reach expected state (or die trying).
                                560                 :                :                      */
                                561                 :            160 :                     StartTransactionCommand();
                                562                 :            160 :                     started_tx = true;
                                563                 :                : 
 2480 tgl@sss.pgh.pa.us         564                 :            160 :                     wait_for_relation_state_change(rstate->relid,
                                565                 :                :                                                    SUBREL_STATE_SYNCDONE);
                                566                 :                :                 }
                                567                 :                :                 else
                                568                 :            389 :                     LWLockRelease(LogicalRepWorkerLock);
                                569                 :                :             }
                                570                 :                :             else
                                571                 :                :             {
                                572                 :                :                 /*
                                573                 :                :                  * If there is no sync worker for this table yet, count
                                574                 :                :                  * running sync workers for this subscription, while we have
                                575                 :                :                  * the lock.
                                576                 :                :                  */
                                577                 :                :                 int         nsyncworkers =
  331                           578                 :            685 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
                                579                 :                : 
                                580                 :                :                 /* Now safe to release the LWLock */
 2480                           581                 :            685 :                 LWLockRelease(LogicalRepWorkerLock);
                                582                 :                : 
                                583                 :                :                 /*
                                584                 :                :                  * If there are free sync worker slot(s), start a new sync
                                585                 :                :                  * worker for the table.
                                586                 :                :                  */
                                587         [ +  + ]:            685 :                 if (nsyncworkers < max_sync_workers_per_subscription)
                                588                 :                :                 {
                                589                 :            200 :                     TimestampTz now = GetCurrentTimestamp();
                                590                 :                :                     struct tablesync_start_time_mapping *hentry;
                                591                 :                :                     bool        found;
                                592                 :                : 
                                593                 :            200 :                     hentry = hash_search(last_start_times, &rstate->relid,
                                594                 :                :                                          HASH_ENTER, &found);
                                595                 :                : 
                                596   [ +  +  +  + ]:            236 :                     if (!found ||
                                597                 :             36 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
                                598                 :                :                                                    wal_retrieve_retry_interval))
                                599                 :                :                     {
  244 akapila@postgresql.o      600                 :GNC         172 :                         logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
                                601                 :            172 :                                                  MyLogicalRepWorker->dbid,
 2480 tgl@sss.pgh.pa.us         602                 :CBC         172 :                                                  MySubscription->oid,
                                603                 :            172 :                                                  MySubscription->name,
                                604                 :            172 :                                                  MyLogicalRepWorker->userid,
                                605                 :                :                                                  rstate->relid,
                                606                 :                :                                                  DSM_HANDLE_INVALID);
                                607                 :            172 :                         hentry->last_start_time = now;
                                608                 :                :                     }
                                609                 :                :                 }
                                610                 :                :             }
                                611                 :                :         }
                                612                 :                :     }
                                613                 :                : 
 2533 peter_e@gmx.net           614         [ +  + ]:           4388 :     if (started_tx)
                                615                 :                :     {
                                616                 :                :         /*
                                617                 :                :          * Even when the two_phase mode is requested by the user, it remains
                                618                 :                :          * as 'pending' until all tablesyncs have reached READY state.
                                619                 :                :          *
                                620                 :                :          * When this happens, we restart the apply worker and (if the
                                621                 :                :          * conditions are still ok) then the two_phase tri-state will become
                                622                 :                :          * 'enabled' at that time.
                                623                 :                :          *
                                624                 :                :          * Note: If the subscription has no tables then leave the state as
                                625                 :                :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
                                626                 :                :          * work.
                                627                 :                :          */
  464 tgl@sss.pgh.pa.us         628         [ +  + ]:            714 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
                                629                 :                :         {
                                630                 :             24 :             CommandCounterIncrement();  /* make updates visible */
                                631         [ +  + ]:             24 :             if (AllTablesyncsReady())
                                632                 :                :             {
                                633         [ +  - ]:              6 :                 ereport(LOG,
                                634                 :                :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
                                635                 :                :                                 MySubscription->name)));
                                636                 :              6 :                 should_exit = true;
                                637                 :                :             }
                                638                 :                :         }
                                639                 :                : 
 2533 peter_e@gmx.net           640                 :            714 :         CommitTransactionCommand();
  739 andres@anarazel.de        641                 :            714 :         pgstat_report_stat(true);
                                642                 :                :     }
                                643                 :                : 
  464 tgl@sss.pgh.pa.us         644         [ +  + ]:           4388 :     if (should_exit)
                                645                 :                :     {
                                646                 :                :         /*
                                647                 :                :          * Reset the last-start time for this worker so that the launcher will
                                648                 :                :          * restart it without waiting for wal_retrieve_retry_interval.
                                649                 :                :          */
  448                           650                 :              6 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
                                651                 :                : 
  464                           652                 :              6 :         proc_exit(0);
                                653                 :                :     }
 2579 peter_e@gmx.net           654                 :           4382 : }
                                655                 :                : 
                                656                 :                : /*
                                657                 :                :  * Process possible state change(s) of tables that are being synchronized.
                                658                 :                :  */
                                659                 :                : void
                                660                 :           4618 : process_syncing_tables(XLogRecPtr current_lsn)
                                661                 :                : {
  236 akapila@postgresql.o      662   [ +  +  +  -  :GNC        4618 :     switch (MyLogicalRepWorker->type)
                                                 - ]
                                663                 :                :     {
                                664                 :             26 :         case WORKERTYPE_PARALLEL_APPLY:
                                665                 :                : 
                                666                 :                :             /*
                                667                 :                :              * Skip for parallel apply workers because they only operate on
                                668                 :                :              * tables that are in a READY state. See pa_can_start() and
                                669                 :                :              * should_apply_changes_for_rel().
                                670                 :                :              */
                                671                 :             26 :             break;
                                672                 :                : 
                                673                 :            204 :         case WORKERTYPE_TABLESYNC:
                                674                 :            204 :             process_syncing_tables_for_sync(current_lsn);
                                675                 :             44 :             break;
                                676                 :                : 
                                677                 :           4388 :         case WORKERTYPE_APPLY:
                                678                 :           4388 :             process_syncing_tables_for_apply(current_lsn);
                                679                 :           4382 :             break;
                                680                 :                : 
  236 akapila@postgresql.o      681                 :UNC           0 :         case WORKERTYPE_UNKNOWN:
                                682                 :                :             /* Should never happen. */
                                683         [ #  # ]:              0 :             elog(ERROR, "Unknown worker type");
                                684                 :                :     }
 2579 peter_e@gmx.net           685                 :GIC        4452 : }
                                686                 :                : 
                                687                 :                : /*
                                688                 :                :  * Create list of columns for COPY based on logical relation mapping.
                                689                 :                :  */
                                690                 :                : static List *
 2579 peter_e@gmx.net           691                 :CBC         168 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
                                692                 :                : {
                                693                 :            168 :     List       *attnamelist = NIL;
                                694                 :                :     int         i;
                                695                 :                : 
 2523                           696         [ +  + ]:            437 :     for (i = 0; i < rel->remoterel.natts; i++)
                                697                 :                :     {
 2579                           698                 :            269 :         attnamelist = lappend(attnamelist,
 2523                           699                 :            269 :                               makeString(rel->remoterel.attnames[i]));
                                700                 :                :     }
                                701                 :                : 
                                702                 :                : 
 2579                           703                 :            168 :     return attnamelist;
                                704                 :                : }
                                705                 :                : 
                                706                 :                : /*
                                707                 :                :  * Data source callback for the COPY FROM, which reads from the remote
                                708                 :                :  * connection and passes the data back to our local COPY.
                                709                 :                :  */
                                710                 :                : static int
                                711                 :          13938 : copy_read_data(void *outbuf, int minread, int maxread)
                                712                 :                : {
 2524 bruce@momjian.us          713                 :          13938 :     int         bytesread = 0;
                                714                 :                :     int         avail;
                                715                 :                : 
                                716                 :                :     /* If there are some leftover data from previous read, use it. */
 2579 peter_e@gmx.net           717                 :          13938 :     avail = copybuf->len - copybuf->cursor;
                                718         [ -  + ]:          13938 :     if (avail)
                                719                 :                :     {
 2579 peter_e@gmx.net           720         [ #  # ]:UBC           0 :         if (avail > maxread)
                                721                 :              0 :             avail = maxread;
                                722                 :              0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                                723                 :              0 :         copybuf->cursor += avail;
                                724                 :              0 :         maxread -= avail;
                                725                 :              0 :         bytesread += avail;
                                726                 :                :     }
                                727                 :                : 
 2508 peter_e@gmx.net           728   [ +  -  +  - ]:CBC       13938 :     while (maxread > 0 && bytesread < minread)
                                729                 :                :     {
 2579                           730                 :          13938 :         pgsocket    fd = PGINVALID_SOCKET;
                                731                 :                :         int         len;
                                732                 :          13938 :         char       *buf = NULL;
                                733                 :                : 
                                734                 :                :         for (;;)
                                735                 :                :         {
                                736                 :                :             /* Try read the data. */
 1068 alvherre@alvh.no-ip.      737                 :UBC           0 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                                738                 :                : 
 2579 peter_e@gmx.net           739         [ -  + ]:CBC       13938 :             CHECK_FOR_INTERRUPTS();
                                740                 :                : 
                                741         [ -  + ]:          13938 :             if (len == 0)
 2579 peter_e@gmx.net           742                 :UBC           0 :                 break;
 2579 peter_e@gmx.net           743         [ +  + ]:CBC       13938 :             else if (len < 0)
                                744                 :          13938 :                 return bytesread;
                                745                 :                :             else
                                746                 :                :             {
                                747                 :                :                 /* Process the data */
                                748                 :          13772 :                 copybuf->data = buf;
                                749                 :          13772 :                 copybuf->len = len;
                                750                 :          13772 :                 copybuf->cursor = 0;
                                751                 :                : 
                                752                 :          13772 :                 avail = copybuf->len - copybuf->cursor;
                                753         [ -  + ]:          13772 :                 if (avail > maxread)
 2579 peter_e@gmx.net           754                 :UBC           0 :                     avail = maxread;
 2579 peter_e@gmx.net           755                 :CBC       13772 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                                756                 :          13772 :                 outbuf = (void *) ((char *) outbuf + avail);
                                757                 :          13772 :                 copybuf->cursor += avail;
                                758                 :          13772 :                 maxread -= avail;
                                759                 :          13772 :                 bytesread += avail;
                                760                 :                :             }
                                761                 :                : 
                                762   [ +  -  +  - ]:          13772 :             if (maxread <= 0 || bytesread >= minread)
                                763                 :          13772 :                 return bytesread;
                                764                 :                :         }
                                765                 :                : 
                                766                 :                :         /*
                                767                 :                :          * Wait for more data or latch.
                                768                 :                :          */
 1969 tmunro@postgresql.or      769                 :UBC           0 :         (void) WaitLatchOrSocket(MyLatch,
                                770                 :                :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
                                771                 :                :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                772                 :                :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
                                773                 :                : 
 2504 andres@anarazel.de        774                 :              0 :         ResetLatch(MyLatch);
                                775                 :                :     }
                                776                 :                : 
 2579 peter_e@gmx.net           777                 :              0 :     return bytesread;
                                778                 :                : }
                                779                 :                : 
                                780                 :                : 
                                781                 :                : /*
                                782                 :                :  * Get information about remote relation in similar fashion the RELATION
                                783                 :                :  * message provides during replication. This function also returns the relation
                                784                 :                :  * qualifications to be used in the COPY command.
                                785                 :                :  */
                                786                 :                : static void
 2579 peter_e@gmx.net           787                 :CBC         169 : fetch_remote_table_info(char *nspname, char *relname,
                                788                 :                :                         LogicalRepRelation *lrel, List **qual)
                                789                 :                : {
                                790                 :                :     WalRcvExecResult *res;
                                791                 :                :     StringInfoData cmd;
                                792                 :                :     TupleTableSlot *slot;
 1487 peter@eisentraut.org      793                 :            169 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
  750 tomas.vondra@postgre      794                 :            169 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
  782 akapila@postgresql.o      795                 :            169 :     Oid         qualRow[] = {TEXTOID};
                                796                 :                :     bool        isnull;
                                797                 :                :     int         natt;
                                798                 :                :     ListCell   *lc;
  750 tomas.vondra@postgre      799                 :            169 :     Bitmapset  *included_cols = NULL;
                                800                 :                : 
 2579 peter_e@gmx.net           801                 :            169 :     lrel->nspname = nspname;
                                802                 :            169 :     lrel->relname = relname;
                                803                 :                : 
                                804                 :                :     /* First fetch Oid and replica identity. */
                                805                 :            169 :     initStringInfo(&cmd);
 1487 peter@eisentraut.org      806                 :            169 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
                                807                 :                :                      "  FROM pg_catalog.pg_class c"
                                808                 :                :                      "  INNER JOIN pg_catalog.pg_namespace n"
                                809                 :                :                      "        ON (c.relnamespace = n.oid)"
                                810                 :                :                      " WHERE n.nspname = %s"
                                811                 :                :                      "   AND c.relname = %s",
                                812                 :                :                      quote_literal_cstr(nspname),
                                813                 :                :                      quote_literal_cstr(relname));
 1068 alvherre@alvh.no-ip.      814                 :            169 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                815                 :                :                       lengthof(tableRow), tableRow);
                                816                 :                : 
 2579 peter_e@gmx.net           817         [ -  + ]:            168 :     if (res->status != WALRCV_OK_TUPLES)
 2579 peter_e@gmx.net           818         [ #  # ]:UBC           0 :         ereport(ERROR,
                                819                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                820                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                821                 :                :                         nspname, relname, res->err)));
                                822                 :                : 
 1977 andres@anarazel.de        823                 :CBC         168 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 2579 peter_e@gmx.net           824         [ -  + ]:            168 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 2579 peter_e@gmx.net           825         [ #  # ]:UBC           0 :         ereport(ERROR,
                                826                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                                827                 :                :                  errmsg("table \"%s.%s\" not found on publisher",
                                828                 :                :                         nspname, relname)));
                                829                 :                : 
 2579 peter_e@gmx.net           830                 :CBC         168 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
                                831         [ -  + ]:            168 :     Assert(!isnull);
                                832                 :            168 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
                                833         [ -  + ]:            168 :     Assert(!isnull);
 1487 peter@eisentraut.org      834                 :            168 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
                                835         [ -  + ]:            168 :     Assert(!isnull);
                                836                 :                : 
 2579 peter_e@gmx.net           837                 :            168 :     ExecDropSingleTupleTableSlot(slot);
                                838                 :            168 :     walrcv_clear_result(res);
                                839                 :                : 
                                840                 :                : 
                                841                 :                :     /*
                                842                 :                :      * Get column lists for each relation.
                                843                 :                :      *
                                844                 :                :      * We need to do this before fetching info about column names and types,
                                845                 :                :      * so that we can skip columns that should not be replicated.
                                846                 :                :      */
  750 tomas.vondra@postgre      847         [ +  - ]:            168 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
                                848                 :                :     {
                                849                 :                :         WalRcvExecResult *pubres;
                                850                 :                :         TupleTableSlot *tslot;
  682 akapila@postgresql.o      851                 :            168 :         Oid         attrsRow[] = {INT2VECTOROID};
                                852                 :                :         StringInfoData pub_names;
                                853                 :                : 
  750 tomas.vondra@postgre      854                 :            168 :         initStringInfo(&pub_names);
                                855   [ +  -  +  +  :            522 :         foreach(lc, MySubscription->publications)
                                              +  + ]
                                856                 :                :         {
  603 drowley@postgresql.o      857         [ +  + ]:            354 :             if (foreach_current_index(lc) > 0)
  586                           858                 :            186 :                 appendStringInfoString(&pub_names, ", ");
  750 tomas.vondra@postgre      859                 :            354 :             appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
                                860                 :                :         }
                                861                 :                : 
                                862                 :                :         /*
                                863                 :                :          * Fetch info about column lists for the relation (from all the
                                864                 :                :          * publications).
                                865                 :                :          */
                                866                 :            168 :         resetStringInfo(&cmd);
                                867                 :            168 :         appendStringInfo(&cmd,
                                868                 :                :                          "SELECT DISTINCT"
                                869                 :                :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
                                870                 :                :                          "   THEN NULL ELSE gpt.attrs END)"
                                871                 :                :                          "  FROM pg_publication p,"
                                872                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
                                873                 :                :                          "  pg_class c"
                                874                 :                :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
                                875                 :                :                          "   AND p.pubname IN ( %s )",
                                876                 :                :                          lrel->remoteid,
                                877                 :                :                          pub_names.data);
                                878                 :                : 
                                879                 :            168 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                880                 :                :                              lengthof(attrsRow), attrsRow);
                                881                 :                : 
                                882         [ -  + ]:            168 :         if (pubres->status != WALRCV_OK_TUPLES)
  750 tomas.vondra@postgre      883         [ #  # ]:UBC           0 :             ereport(ERROR,
                                884                 :                :                     (errcode(ERRCODE_CONNECTION_FAILURE),
                                885                 :                :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
                                886                 :                :                             nspname, relname, pubres->err)));
                                887                 :                : 
                                888                 :                :         /*
                                889                 :                :          * We don't support the case where the column list is different for
                                890                 :                :          * the same table when combining publications. See comments atop
                                891                 :                :          * fetch_table_list. So there should be only one row returned.
                                892                 :                :          * Although we already checked this when creating the subscription, we
                                893                 :                :          * still need to check here in case the column list was changed after
                                894                 :                :          * creating the subscription and before the sync worker is started.
                                895                 :                :          */
  682 akapila@postgresql.o      896         [ -  + ]:CBC         168 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
  682 akapila@postgresql.o      897         [ #  # ]:UBC           0 :             ereport(ERROR,
                                898                 :                :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                899                 :                :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
                                900                 :                :                            nspname, relname));
                                901                 :                : 
                                902                 :                :         /*
                                903                 :                :          * Get the column list and build a single bitmap with the attnums.
                                904                 :                :          *
                                905                 :                :          * If we find a NULL value, it means all the columns should be
                                906                 :                :          * replicated.
                                907                 :                :          */
  603 drowley@postgresql.o      908                 :CBC         168 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
                                909         [ +  - ]:            168 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
                                910                 :                :         {
                                911                 :            168 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
                                912                 :                : 
  682 akapila@postgresql.o      913         [ +  + ]:            168 :             if (!isnull)
                                914                 :                :             {
                                915                 :                :                 ArrayType  *arr;
                                916                 :                :                 int         nelems;
                                917                 :                :                 int16      *elems;
                                918                 :                : 
                                919                 :             19 :                 arr = DatumGetArrayTypeP(cfval);
                                920                 :             19 :                 nelems = ARR_DIMS(arr)[0];
                                921         [ -  + ]:             19 :                 elems = (int16 *) ARR_DATA_PTR(arr);
                                922                 :                : 
                                923         [ +  + ]:             53 :                 for (natt = 0; natt < nelems; natt++)
                                924                 :             34 :                     included_cols = bms_add_member(included_cols, elems[natt]);
                                925                 :                :             }
                                926                 :                : 
  603 drowley@postgresql.o      927                 :            168 :             ExecClearTuple(tslot);
                                928                 :                :         }
                                929                 :            168 :         ExecDropSingleTupleTableSlot(tslot);
                                930                 :                : 
  750 tomas.vondra@postgre      931                 :            168 :         walrcv_clear_result(pubres);
                                932                 :                : 
                                933                 :            168 :         pfree(pub_names.data);
                                934                 :                :     }
                                935                 :                : 
                                936                 :                :     /*
                                937                 :                :      * Now fetch column names and types.
                                938                 :                :      */
 2579 peter_e@gmx.net           939                 :            168 :     resetStringInfo(&cmd);
                                940         [ +  - ]:            168 :     appendStringInfo(&cmd,
                                941                 :                :                      "SELECT a.attnum,"
                                942                 :                :                      "       a.attname,"
                                943                 :                :                      "       a.atttypid,"
                                944                 :                :                      "       a.attnum = ANY(i.indkey)"
                                945                 :                :                      "  FROM pg_catalog.pg_attribute a"
                                946                 :                :                      "  LEFT JOIN pg_catalog.pg_index i"
                                947                 :                :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
                                948                 :                :                      " WHERE a.attnum > 0::pg_catalog.int2"
                                949                 :                :                      "   AND NOT a.attisdropped %s"
                                950                 :                :                      "   AND a.attrelid = %u"
                                951                 :                :                      " ORDER BY a.attnum",
                                952                 :                :                      lrel->remoteid,
 1068 alvherre@alvh.no-ip.      953                 :            168 :                      (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
                                954                 :                :                       "AND a.attgenerated = ''" : ""),
                                955                 :                :                      lrel->remoteid);
                                956                 :            168 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                957                 :                :                       lengthof(attrRow), attrRow);
                                958                 :                : 
 2579 peter_e@gmx.net           959         [ -  + ]:            168 :     if (res->status != WALRCV_OK_TUPLES)
 2579 peter_e@gmx.net           960         [ #  # ]:UBC           0 :         ereport(ERROR,
                                961                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                962                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                963                 :                :                         nspname, relname, res->err)));
                                964                 :                : 
                                965                 :                :     /* We don't know the number of rows coming, so allocate enough space. */
 2579 peter_e@gmx.net           966                 :CBC         168 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
                                967                 :            168 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
                                968                 :            168 :     lrel->attkeys = NULL;
                                969                 :                : 
                                970                 :                :     /*
                                971                 :                :      * Store the columns as a list of names.  Ignore those that are not
                                972                 :                :      * present in the column list, if there is one.
                                973                 :                :      */
                                974                 :            168 :     natt = 0;
 1977 andres@anarazel.de        975                 :            168 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 2579 peter_e@gmx.net           976         [ +  + ]:            459 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                                977                 :                :     {
                                978                 :                :         char       *rel_colname;
                                979                 :                :         AttrNumber  attnum;
                                980                 :                : 
  750 tomas.vondra@postgre      981                 :            291 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
                                982         [ -  + ]:            291 :         Assert(!isnull);
                                983                 :                : 
                                984                 :                :         /* If the column is not in the column list, skip it. */
                                985   [ +  +  +  + ]:            291 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
                                986                 :                :         {
                                987                 :             22 :             ExecClearTuple(slot);
                                988                 :             22 :             continue;
                                989                 :                :         }
                                990                 :                : 
                                991                 :            269 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 2579 peter_e@gmx.net           992         [ -  + ]:            269 :         Assert(!isnull);
                                993                 :                : 
  750 tomas.vondra@postgre      994                 :            269 :         lrel->attnames[natt] = rel_colname;
                                995                 :            269 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 2579 peter_e@gmx.net           996         [ -  + ]:            269 :         Assert(!isnull);
                                997                 :                : 
  750 tomas.vondra@postgre      998         [ +  + ]:            269 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 2579 peter_e@gmx.net           999                 :             97 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
                               1000                 :                : 
                               1001                 :                :         /* Should never happen. */
                               1002         [ -  + ]:            269 :         if (++natt >= MaxTupleAttributeNumber)
 2579 peter_e@gmx.net          1003         [ #  # ]:UBC           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
                               1004                 :                :                  nspname, relname);
                               1005                 :                : 
 2579 peter_e@gmx.net          1006                 :CBC         269 :         ExecClearTuple(slot);
                               1007                 :                :     }
                               1008                 :            168 :     ExecDropSingleTupleTableSlot(slot);
                               1009                 :                : 
                               1010                 :            168 :     lrel->natts = natt;
                               1011                 :                : 
                               1012                 :            168 :     walrcv_clear_result(res);
                               1013                 :                : 
                               1014                 :                :     /*
                               1015                 :                :      * Get relation's row filter expressions. DISTINCT avoids the same
                               1016                 :                :      * expression of a table in multiple publications from being included
                               1017                 :                :      * multiple times in the final expression.
                               1018                 :                :      *
                               1019                 :                :      * We need to copy the row even if it matches just one of the
                               1020                 :                :      * publications, so we later combine all the quals with OR.
                               1021                 :                :      *
                               1022                 :                :      * For initial synchronization, row filtering can be ignored in following
                               1023                 :                :      * cases:
                               1024                 :                :      *
                               1025                 :                :      * 1) one of the subscribed publications for the table hasn't specified
                               1026                 :                :      * any row filter
                               1027                 :                :      *
                               1028                 :                :      * 2) one of the subscribed publications has puballtables set to true
                               1029                 :                :      *
                               1030                 :                :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
                               1031                 :                :      * that includes this relation
                               1032                 :                :      */
  782 akapila@postgresql.o     1033         [ +  - ]:            168 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
                               1034                 :                :     {
                               1035                 :                :         StringInfoData pub_names;
                               1036                 :                : 
                               1037                 :                :         /* Build the pubname list. */
                               1038                 :            168 :         initStringInfo(&pub_names);
  101 nathan@postgresql.or     1039   [ +  -  +  +  :GNC         690 :         foreach_node(String, pubstr, MySubscription->publications)
                                              +  + ]
                               1040                 :                :         {
                               1041                 :            354 :             char       *pubname = strVal(pubstr);
                               1042                 :                : 
                               1043         [ +  + ]:            354 :             if (foreach_current_index(pubstr) > 0)
  782 akapila@postgresql.o     1044                 :CBC         186 :                 appendStringInfoString(&pub_names, ", ");
                               1045                 :                : 
                               1046                 :            354 :             appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
                               1047                 :                :         }
                               1048                 :                : 
                               1049                 :                :         /* Check for row filters. */
                               1050                 :            168 :         resetStringInfo(&cmd);
                               1051                 :            168 :         appendStringInfo(&cmd,
                               1052                 :                :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
                               1053                 :                :                          "  FROM pg_publication p,"
                               1054                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
                               1055                 :                :                          " WHERE gpt.relid = %u"
                               1056                 :                :                          "   AND p.pubname IN ( %s )",
                               1057                 :                :                          lrel->remoteid,
                               1058                 :                :                          pub_names.data);
                               1059                 :                : 
                               1060                 :            168 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
                               1061                 :                : 
                               1062         [ -  + ]:            168 :         if (res->status != WALRCV_OK_TUPLES)
  782 akapila@postgresql.o     1063         [ #  # ]:UBC           0 :             ereport(ERROR,
                               1064                 :                :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
                               1065                 :                :                             nspname, relname, res->err)));
                               1066                 :                : 
                               1067                 :                :         /*
                               1068                 :                :          * Multiple row filter expressions for the same table will be combined
                               1069                 :                :          * by COPY using OR. If any of the filter expressions for this table
                               1070                 :                :          * are null, it means the whole table will be copied. In this case it
                               1071                 :                :          * is not necessary to construct a unified row filter expression at
                               1072                 :                :          * all.
                               1073                 :                :          */
  782 akapila@postgresql.o     1074                 :CBC         168 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
                               1075         [ +  + ]:            182 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                               1076                 :                :         {
                               1077                 :            172 :             Datum       rf = slot_getattr(slot, 1, &isnull);
                               1078                 :                : 
                               1079         [ +  + ]:            172 :             if (!isnull)
                               1080                 :             14 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
                               1081                 :                :             else
                               1082                 :                :             {
                               1083                 :                :                 /* Ignore filters and cleanup as necessary. */
                               1084         [ +  + ]:            158 :                 if (*qual)
                               1085                 :                :                 {
                               1086                 :              3 :                     list_free_deep(*qual);
                               1087                 :              3 :                     *qual = NIL;
                               1088                 :                :                 }
                               1089                 :            158 :                 break;
                               1090                 :                :             }
                               1091                 :                : 
                               1092                 :             14 :             ExecClearTuple(slot);
                               1093                 :                :         }
                               1094                 :            168 :         ExecDropSingleTupleTableSlot(slot);
                               1095                 :                : 
                               1096                 :            168 :         walrcv_clear_result(res);
                               1097                 :                :     }
                               1098                 :                : 
 2579 peter_e@gmx.net          1099                 :            168 :     pfree(cmd.data);
                               1100                 :            168 : }
                               1101                 :                : 
                               1102                 :                : /*
                               1103                 :                :  * Copy existing data of a table from publisher.
                               1104                 :                :  *
                               1105                 :                :  * Caller is responsible for locking the local relation.
                               1106                 :                :  */
                               1107                 :                : static void
                               1108                 :            169 : copy_table(Relation rel)
                               1109                 :                : {
                               1110                 :                :     LogicalRepRelMapEntry *relmapentry;
                               1111                 :                :     LogicalRepRelation lrel;
  782 akapila@postgresql.o     1112                 :            169 :     List       *qual = NIL;
                               1113                 :                :     WalRcvExecResult *res;
                               1114                 :                :     StringInfoData cmd;
                               1115                 :                :     CopyFromState cstate;
                               1116                 :                :     List       *attnamelist;
                               1117                 :                :     ParseState *pstate;
  388                          1118                 :            169 :     List       *options = NIL;
                               1119                 :                : 
                               1120                 :                :     /* Get the publisher relation info. */
 2579 peter_e@gmx.net          1121                 :            169 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
  782 akapila@postgresql.o     1122                 :            169 :                             RelationGetRelationName(rel), &lrel, &qual);
                               1123                 :                : 
                               1124                 :                :     /* Put the relation into relmap. */
 2579 peter_e@gmx.net          1125                 :            168 :     logicalrep_relmap_update(&lrel);
                               1126                 :                : 
                               1127                 :                :     /* Map the publisher relation to local one. */
                               1128                 :            168 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
                               1129         [ -  + ]:            168 :     Assert(rel == relmapentry->localrel);
                               1130                 :                : 
                               1131                 :                :     /* Start copy on the publisher. */
                               1132                 :            168 :     initStringInfo(&cmd);
                               1133                 :                : 
                               1134                 :                :     /* Regular table with no row filter */
  782 akapila@postgresql.o     1135   [ +  +  +  + ]:            168 :     if (lrel.relkind == RELKIND_RELATION && qual == NIL)
                               1136                 :                :     {
  144                          1137                 :            144 :         appendStringInfo(&cmd, "COPY %s",
 1487 peter@eisentraut.org     1138                 :            144 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1139                 :                : 
                               1140                 :                :         /* If the table has columns, then specify the columns */
  144 akapila@postgresql.o     1141         [ +  + ]:            144 :         if (lrel.natts)
                               1142                 :                :         {
                               1143                 :            143 :             appendStringInfoString(&cmd, " (");
                               1144                 :                : 
                               1145                 :                :             /*
                               1146                 :                :              * XXX Do we need to list the columns in all cases? Maybe we're
                               1147                 :                :              * replicating all columns?
                               1148                 :                :              */
                               1149         [ +  + ]:            376 :             for (int i = 0; i < lrel.natts; i++)
                               1150                 :                :             {
                               1151         [ +  + ]:            233 :                 if (i > 0)
                               1152                 :             90 :                     appendStringInfoString(&cmd, ", ");
                               1153                 :                : 
                               1154                 :            233 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1155                 :                :             }
                               1156                 :                : 
    4 drowley@postgresql.o     1157                 :GNC         143 :             appendStringInfoChar(&cmd, ')');
                               1158                 :                :         }
                               1159                 :                : 
  144 akapila@postgresql.o     1160                 :CBC         144 :         appendStringInfoString(&cmd, " TO STDOUT");
                               1161                 :                :     }
                               1162                 :                :     else
                               1163                 :                :     {
                               1164                 :                :         /*
                               1165                 :                :          * For non-tables and tables with row filters, we need to do COPY
                               1166                 :                :          * (SELECT ...), but we can't just do SELECT * because we need to not
                               1167                 :                :          * copy generated columns. For tables with any row filters, build a
                               1168                 :                :          * SELECT query with OR'ed row filters for COPY.
                               1169                 :                :          */
 1277 drowley@postgresql.o     1170                 :             24 :         appendStringInfoString(&cmd, "COPY (SELECT ");
 1487 peter@eisentraut.org     1171         [ +  + ]:             60 :         for (int i = 0; i < lrel.natts; i++)
                               1172                 :                :         {
                               1173                 :             36 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1174         [ +  + ]:             36 :             if (i < lrel.natts - 1)
                               1175                 :             12 :                 appendStringInfoString(&cmd, ", ");
                               1176                 :                :         }
                               1177                 :                : 
  782 akapila@postgresql.o     1178                 :             24 :         appendStringInfoString(&cmd, " FROM ");
                               1179                 :                : 
                               1180                 :                :         /*
                               1181                 :                :          * For regular tables, make sure we don't copy data from a child that
                               1182                 :                :          * inherits the named table as those will be copied separately.
                               1183                 :                :          */
                               1184         [ +  + ]:             24 :         if (lrel.relkind == RELKIND_RELATION)
                               1185                 :              7 :             appendStringInfoString(&cmd, "ONLY ");
                               1186                 :                : 
                               1187                 :             24 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1188                 :                :         /* list of OR'ed filters */
                               1189         [ +  + ]:             24 :         if (qual != NIL)
                               1190                 :                :         {
                               1191                 :                :             ListCell   *lc;
                               1192                 :             10 :             char       *q = strVal(linitial(qual));
                               1193                 :                : 
                               1194                 :             10 :             appendStringInfo(&cmd, " WHERE %s", q);
                               1195   [ +  -  +  +  :             11 :             for_each_from(lc, qual, 1)
                                              +  + ]
                               1196                 :                :             {
                               1197                 :              1 :                 q = strVal(lfirst(lc));
                               1198                 :              1 :                 appendStringInfo(&cmd, " OR %s", q);
                               1199                 :                :             }
                               1200                 :             10 :             list_free_deep(qual);
                               1201                 :                :         }
                               1202                 :                : 
                               1203                 :             24 :         appendStringInfoString(&cmd, ") TO STDOUT");
                               1204                 :                :     }
                               1205                 :                : 
                               1206                 :                :     /*
                               1207                 :                :      * Prior to v16, initial table synchronization will use text format even
                               1208                 :                :      * if the binary option is enabled for a subscription.
                               1209                 :                :      */
  388                          1210         [ +  - ]:            168 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
                               1211         [ +  + ]:            168 :         MySubscription->binary)
                               1212                 :                :     {
                               1213                 :              5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
                               1214                 :              5 :         options = list_make1(makeDefElem("format",
                               1215                 :                :                                          (Node *) makeString("binary"), -1));
                               1216                 :                :     }
                               1217                 :                : 
 1068 alvherre@alvh.no-ip.     1218                 :            168 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 2579 peter_e@gmx.net          1219                 :            168 :     pfree(cmd.data);
                               1220         [ -  + ]:            168 :     if (res->status != WALRCV_OK_COPY_OUT)
 2579 peter_e@gmx.net          1221         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1222                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1223                 :                :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
                               1224                 :                :                         lrel.nspname, lrel.relname, res->err)));
 2579 peter_e@gmx.net          1225                 :CBC         168 :     walrcv_clear_result(res);
                               1226                 :                : 
                               1227                 :            168 :     copybuf = makeStringInfo();
                               1228                 :                : 
 2554                          1229                 :            168 :     pstate = make_parsestate(NULL);
 1564 tgl@sss.pgh.pa.us        1230                 :            168 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
                               1231                 :                :                                          NULL, false, false);
                               1232                 :                : 
 2579 peter_e@gmx.net          1233                 :            168 :     attnamelist = make_copy_attnamelist(relmapentry);
  388 akapila@postgresql.o     1234                 :            168 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
                               1235                 :                : 
                               1236                 :                :     /* Do the copy */
 2579 peter_e@gmx.net          1237                 :            167 :     (void) CopyFrom(cstate);
                               1238                 :                : 
                               1239                 :            160 :     logicalrep_rel_close(relmapentry, NoLock);
                               1240                 :            160 : }
                               1241                 :                : 
                               1242                 :                : /*
                               1243                 :                :  * Determine the tablesync slot name.
                               1244                 :                :  *
                               1245                 :                :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
                               1246                 :                :  * on slot name length. We append system_identifier to avoid slot_name
                               1247                 :                :  * collision with subscriptions in other clusters. With the current scheme
                               1248                 :                :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
                               1249                 :                :  * length of slot_name will be 50.
                               1250                 :                :  *
                               1251                 :                :  * The returned slot name is stored in the supplied buffer (syncslotname) with
                               1252                 :                :  * the given size.
                               1253                 :                :  *
                               1254                 :                :  * Note: We don't use the subscription slot name as part of tablesync slot name
                               1255                 :                :  * because we are responsible for cleaning up these slots and it could become
                               1256                 :                :  * impossible to recalculate what name to cleanup if the subscription slot name
                               1257                 :                :  * had changed.
                               1258                 :                :  */
                               1259                 :                : void
 1157 akapila@postgresql.o     1260                 :            331 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
                               1261                 :                :                                 char *syncslotname, Size szslot)
                               1262                 :                : {
 1154                          1263                 :            331 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
                               1264                 :                :              relid, GetSystemIdentifier());
 1157                          1265                 :            331 : }
                               1266                 :                : 
                               1267                 :                : /*
                               1268                 :                :  * Start syncing the table in the sync worker.
                               1269                 :                :  *
                               1270                 :                :  * If nothing needs to be done to sync the table, we exit the worker without
                               1271                 :                :  * any further action.
                               1272                 :                :  *
                               1273                 :                :  * The returned slot name is palloc'ed in current memory context.
                               1274                 :                :  */
                               1275                 :                : static char *
 2579 peter_e@gmx.net          1276                 :            169 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                               1277                 :                : {
                               1278                 :                :     char       *slotname;
                               1279                 :                :     char       *err;
                               1280                 :                :     char        relstate;
                               1281                 :                :     XLogRecPtr  relstate_lsn;
                               1282                 :                :     Relation    rel;
                               1283                 :                :     AclResult   aclresult;
                               1284                 :                :     WalRcvExecResult *res;
                               1285                 :                :     char        originname[NAMEDATALEN];
                               1286                 :                :     RepOriginId originid;
                               1287                 :                :     UserContext ucxt;
                               1288                 :                :     bool        must_use_password;
                               1289                 :                :     bool        run_as_owner;
                               1290                 :                : 
                               1291                 :                :     /* Check the state of the table synchronization. */
                               1292                 :            169 :     StartTransactionCommand();
 2551 fujii@postgresql.org     1293                 :            169 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                               1294                 :            169 :                                        MyLogicalRepWorker->relid,
                               1295                 :                :                                        &relstate_lsn);
  180 akapila@postgresql.o     1296                 :GNC         169 :     CommitTransactionCommand();
                               1297                 :                : 
                               1298                 :                :     /* Is the use of a password mandatory? */
  304 akapila@postgresql.o     1299         [ +  + ]:CBC         334 :     must_use_password = MySubscription->passwordrequired &&
  180 akapila@postgresql.o     1300         [ -  + ]:GNC         165 :         !MySubscription->ownersuperuser;
                               1301                 :                : 
 2579 peter_e@gmx.net          1302         [ -  + ]:CBC         169 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 2551 fujii@postgresql.org     1303                 :            169 :     MyLogicalRepWorker->relstate = relstate;
                               1304                 :            169 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 2579 peter_e@gmx.net          1305                 :            169 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1306                 :                : 
                               1307                 :                :     /*
                               1308                 :                :      * If synchronization is already done or no longer necessary, exit now
                               1309                 :                :      * that we've updated shared memory state.
                               1310                 :                :      */
 1277 alvherre@alvh.no-ip.     1311         [ -  + ]:            169 :     switch (relstate)
                               1312                 :                :     {
 1277 alvherre@alvh.no-ip.     1313                 :UBC           0 :         case SUBREL_STATE_SYNCDONE:
                               1314                 :                :         case SUBREL_STATE_READY:
                               1315                 :                :         case SUBREL_STATE_UNKNOWN:
                               1316                 :              0 :             finish_sync_worker();   /* doesn't return */
                               1317                 :                :     }
                               1318                 :                : 
                               1319                 :                :     /* Calculate the name of the tablesync slot. */
 1154 akapila@postgresql.o     1320                 :CBC         169 :     slotname = (char *) palloc(NAMEDATALEN);
                               1321                 :            169 :     ReplicationSlotNameForTablesync(MySubscription->oid,
                               1322                 :            169 :                                     MyLogicalRepWorker->relid,
                               1323                 :                :                                     slotname,
                               1324                 :                :                                     NAMEDATALEN);
                               1325                 :                : 
                               1326                 :                :     /*
                               1327                 :                :      * Here we use the slot name instead of the subscription name as the
                               1328                 :                :      * application_name, so that it is different from the leader apply worker,
                               1329                 :                :      * so that synchronous replication can distinguish them.
                               1330                 :                :      */
 1068 alvherre@alvh.no-ip.     1331                 :            169 :     LogRepWorkerWalRcvConn =
   69 akapila@postgresql.o     1332                 :GNC         169 :         walrcv_connect(MySubscription->conninfo, true, true,
                               1333                 :                :                        must_use_password,
                               1334                 :                :                        slotname, &err);
 1068 alvherre@alvh.no-ip.     1335         [ -  + ]:CBC         169 :     if (LogRepWorkerWalRcvConn == NULL)
 2579 peter_e@gmx.net          1336         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1337                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1338                 :                :                  errmsg("could not connect to the publisher: %s", err)));
                               1339                 :                : 
 1277 alvherre@alvh.no-ip.     1340   [ +  +  -  +  :CBC         169 :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
                                              -  - ]
                               1341                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
                               1342                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
                               1343                 :                : 
                               1344                 :                :     /* Assign the origin tracking record name. */
  551 akapila@postgresql.o     1345                 :            169 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1346                 :            169 :                                        MyLogicalRepWorker->relid,
                               1347                 :                :                                        originname,
                               1348                 :                :                                        sizeof(originname));
                               1349                 :                : 
 1157                          1350         [ +  + ]:            169 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
                               1351                 :                :     {
                               1352                 :                :         /*
                               1353                 :                :          * We have previously errored out before finishing the copy so the
                               1354                 :                :          * replication slot might exist. We want to remove the slot if it
                               1355                 :                :          * already exists and proceed.
                               1356                 :                :          *
                               1357                 :                :          * XXX We could also instead try to drop the slot, last time we failed
                               1358                 :                :          * but for that, we might need to clean up the copy state as it might
                               1359                 :                :          * be in the middle of fetching the rows. Also, if there is a network
                               1360                 :                :          * breakdown then it wouldn't have succeeded so trying it next time
                               1361                 :                :          * seems like a better bet.
                               1362                 :                :          */
 1068 alvherre@alvh.no-ip.     1363                 :              7 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
                               1364                 :                :     }
 1157 akapila@postgresql.o     1365         [ -  + ]:            162 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
                               1366                 :                :     {
                               1367                 :                :         /*
                               1368                 :                :          * The COPY phase was previously done, but tablesync then crashed
                               1369                 :                :          * before it was able to finish normally.
                               1370                 :                :          */
 1157 akapila@postgresql.o     1371                 :UBC           0 :         StartTransactionCommand();
                               1372                 :                : 
                               1373                 :                :         /*
                               1374                 :                :          * The origin tracking name must already exist. It was created first
                               1375                 :                :          * time this tablesync was launched.
                               1376                 :                :          */
                               1377                 :              0 :         originid = replorigin_by_name(originname, false);
  461                          1378                 :              0 :         replorigin_session_setup(originid, 0);
 1157                          1379                 :              0 :         replorigin_session_origin = originid;
                               1380                 :              0 :         *origin_startpos = replorigin_session_get_progress(false);
                               1381                 :                : 
                               1382                 :              0 :         CommitTransactionCommand();
                               1383                 :                : 
                               1384                 :              0 :         goto copy_table_done;
                               1385                 :                :     }
                               1386                 :                : 
 1277 alvherre@alvh.no-ip.     1387         [ -  + ]:CBC         169 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1388                 :            169 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                               1389                 :            169 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                               1390                 :            169 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1391                 :                : 
                               1392                 :                :     /* Update the state and make it visible to others. */
                               1393                 :            169 :     StartTransactionCommand();
                               1394                 :            169 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1395                 :            169 :                                MyLogicalRepWorker->relid,
                               1396                 :            169 :                                MyLogicalRepWorker->relstate,
                               1397                 :            169 :                                MyLogicalRepWorker->relstate_lsn);
                               1398                 :            169 :     CommitTransactionCommand();
  739 andres@anarazel.de       1399                 :            169 :     pgstat_report_stat(true);
                               1400                 :                : 
 1277 alvherre@alvh.no-ip.     1401                 :            169 :     StartTransactionCommand();
                               1402                 :                : 
                               1403                 :                :     /*
                               1404                 :                :      * Use a standard write lock here. It might be better to disallow access
                               1405                 :                :      * to the table while it's being synchronized. But we don't want to block
                               1406                 :                :      * the main apply process from working and it has to open the relation in
                               1407                 :                :      * RowExclusiveLock when remapping remote relation id to local one.
                               1408                 :                :      */
                               1409                 :            169 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
                               1410                 :                : 
                               1411                 :                :     /*
                               1412                 :                :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
                               1413                 :                :      * ensures that both the replication slot we create (see below) and the
                               1414                 :                :      * COPY are consistent with each other.
                               1415                 :                :      */
 1068                          1416                 :            169 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
                               1417                 :                :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                               1418                 :                :                       0, NULL);
 1277                          1419         [ -  + ]:            169 :     if (res->status != WALRCV_OK_COMMAND)
 1277 alvherre@alvh.no-ip.     1420         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1421                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1422                 :                :                  errmsg("table copy could not start transaction on publisher: %s",
                               1423                 :                :                         res->err)));
 1277 alvherre@alvh.no-ip.     1424                 :CBC         169 :     walrcv_clear_result(res);
                               1425                 :                : 
                               1426                 :                :     /*
                               1427                 :                :      * Create a new permanent logical decoding slot. This slot will be used
                               1428                 :                :      * for the catchup phase after COPY is done, so tell it to use the
                               1429                 :                :      * snapshot to make the final data consistent.
                               1430                 :                :      */
 1005 akapila@postgresql.o     1431                 :            169 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
                               1432                 :                :                        slotname, false /* permanent */ , false /* two_phase */ ,
                               1433                 :                :                        MySubscription->failover,
                               1434                 :                :                        CRS_USE_SNAPSHOT, origin_startpos);
                               1435                 :                : 
                               1436                 :                :     /*
                               1437                 :                :      * Setup replication origin tracking. The purpose of doing this before the
                               1438                 :                :      * copy is to avoid doing the copy again due to any error in setting up
                               1439                 :                :      * origin tracking.
                               1440                 :                :      */
 1157                          1441                 :            169 :     originid = replorigin_by_name(originname, true);
                               1442         [ +  - ]:            169 :     if (!OidIsValid(originid))
                               1443                 :                :     {
                               1444                 :                :         /*
                               1445                 :                :          * Origin tracking does not exist, so create it now.
                               1446                 :                :          *
                               1447                 :                :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
                               1448                 :                :          * logged for the purpose of recovery. Locks are to prevent the
                               1449                 :                :          * replication origin from vanishing while advancing.
                               1450                 :                :          */
                               1451                 :            169 :         originid = replorigin_create(originname);
                               1452                 :                : 
                               1453                 :            169 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1454                 :            169 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
                               1455                 :                :                            true /* go backward */ , true /* WAL log */ );
                               1456                 :            169 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1457                 :                : 
  461                          1458                 :            169 :         replorigin_session_setup(originid, 0);
 1157                          1459                 :            169 :         replorigin_session_origin = originid;
                               1460                 :                :     }
                               1461                 :                :     else
                               1462                 :                :     {
 1157 akapila@postgresql.o     1463         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1464                 :                :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1465                 :                :                  errmsg("replication origin \"%s\" already exists",
                               1466                 :                :                         originname)));
                               1467                 :                :     }
                               1468                 :                : 
                               1469                 :                :     /*
                               1470                 :                :      * Make sure that the copy command runs as the table owner, unless the
                               1471                 :                :      * user has opted out of that behaviour.
                               1472                 :                :      */
  310 msawada@postgresql.o     1473                 :CBC         169 :     run_as_owner = MySubscription->runasowner;
                               1474         [ +  + ]:            169 :     if (!run_as_owner)
                               1475                 :            168 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
                               1476                 :                : 
                               1477                 :                :     /*
                               1478                 :                :      * Check that our table sync worker has permission to insert into the
                               1479                 :                :      * target table.
                               1480                 :                :      */
                               1481                 :            169 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
                               1482                 :                :                                   ACL_INSERT);
                               1483         [ -  + ]:            169 :     if (aclresult != ACLCHECK_OK)
  310 msawada@postgresql.o     1484                 :UBC           0 :         aclcheck_error(aclresult,
                               1485                 :              0 :                        get_relkind_objtype(rel->rd_rel->relkind),
                               1486                 :              0 :                        RelationGetRelationName(rel));
                               1487                 :                : 
                               1488                 :                :     /*
                               1489                 :                :      * COPY FROM does not honor RLS policies.  That is not a problem for
                               1490                 :                :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
                               1491                 :                :      * who has it implicitly), but other roles should not be able to
                               1492                 :                :      * circumvent RLS.  Disallow logical replication into RLS enabled
                               1493                 :                :      * relations for such roles.
                               1494                 :                :      */
  310 msawada@postgresql.o     1495         [ -  + ]:CBC         169 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
  310 msawada@postgresql.o     1496         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1497                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1498                 :                :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
                               1499                 :                :                         GetUserNameFromId(GetUserId(), true),
                               1500                 :                :                         RelationGetRelationName(rel))));
                               1501                 :                : 
                               1502                 :                :     /* Now do the initial data copy */
  738 tomas.vondra@postgre     1503                 :CBC         169 :     PushActiveSnapshot(GetTransactionSnapshot());
                               1504                 :            169 :     copy_table(rel);
                               1505                 :            160 :     PopActiveSnapshot();
                               1506                 :                : 
 1068 alvherre@alvh.no-ip.     1507                 :            160 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 1277                          1508         [ -  + ]:            160 :     if (res->status != WALRCV_OK_COMMAND)
 1277 alvherre@alvh.no-ip.     1509         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1510                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1511                 :                :                  errmsg("table copy could not finish transaction on publisher: %s",
                               1512                 :                :                         res->err)));
 1277 alvherre@alvh.no-ip.     1513                 :CBC         160 :     walrcv_clear_result(res);
                               1514                 :                : 
  299 tgl@sss.pgh.pa.us        1515         [ +  + ]:            160 :     if (!run_as_owner)
  310 msawada@postgresql.o     1516                 :            159 :         RestoreUserContext(&ucxt);
                               1517                 :                : 
 1277 alvherre@alvh.no-ip.     1518                 :            160 :     table_close(rel, NoLock);
                               1519                 :                : 
                               1520                 :                :     /* Make the copy visible. */
                               1521                 :            160 :     CommandCounterIncrement();
                               1522                 :                : 
                               1523                 :                :     /*
                               1524                 :                :      * Update the persisted state to indicate the COPY phase is done; make it
                               1525                 :                :      * visible to others.
                               1526                 :                :      */
 1157 akapila@postgresql.o     1527                 :            160 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1528                 :            160 :                                MyLogicalRepWorker->relid,
                               1529                 :                :                                SUBREL_STATE_FINISHEDCOPY,
                               1530                 :            160 :                                MyLogicalRepWorker->relstate_lsn);
                               1531                 :                : 
                               1532                 :            160 :     CommitTransactionCommand();
                               1533                 :                : 
                               1534                 :            160 : copy_table_done:
                               1535                 :                : 
                               1536         [ -  + ]:            160 :     elog(DEBUG1,
                               1537                 :                :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
                               1538                 :                :          originname, LSN_FORMAT_ARGS(*origin_startpos));
                               1539                 :                : 
                               1540                 :                :     /*
                               1541                 :                :      * We are done with the initial data synchronization, update the state.
                               1542                 :                :      */
 1277 alvherre@alvh.no-ip.     1543         [ -  + ]:            160 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1544                 :            160 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
                               1545                 :            160 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                               1546                 :            160 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1547                 :                : 
                               1548                 :                :     /*
                               1549                 :                :      * Finally, wait until the leader apply worker tells us to catch up and
                               1550                 :                :      * then return to let LogicalRepApplyLoop do it.
                               1551                 :                :      */
                               1552                 :            160 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 2579 peter_e@gmx.net          1553                 :            160 :     return slotname;
                               1554                 :                : }
                               1555                 :                : 
                               1556                 :                : /*
                               1557                 :                :  * Common code to fetch the up-to-date sync state info into the static lists.
                               1558                 :                :  *
                               1559                 :                :  * Returns true if subscription has 1 or more tables, else false.
                               1560                 :                :  *
                               1561                 :                :  * Note: If this function started the transaction (indicated by the parameter)
                               1562                 :                :  * then it is the caller's responsibility to commit it.
                               1563                 :                :  */
                               1564                 :                : static bool
 1005 akapila@postgresql.o     1565                 :           4456 : FetchTableStates(bool *started_tx)
                               1566                 :                : {
                               1567                 :                :     static bool has_subrels = false;
                               1568                 :                : 
                               1569                 :           4456 :     *started_tx = false;
                               1570                 :                : 
                               1571         [ +  + ]:           4456 :     if (!table_states_valid)
                               1572                 :                :     {
                               1573                 :                :         MemoryContext oldctx;
                               1574                 :                :         List       *rstates;
                               1575                 :                :         ListCell   *lc;
                               1576                 :                :         SubscriptionRelState *rstate;
                               1577                 :                : 
                               1578                 :                :         /* Clean the old lists. */
                               1579                 :            736 :         list_free_deep(table_states_not_ready);
                               1580                 :            736 :         table_states_not_ready = NIL;
                               1581                 :                : 
                               1582         [ +  + ]:            736 :         if (!IsTransactionState())
                               1583                 :                :         {
                               1584                 :            720 :             StartTransactionCommand();
                               1585                 :            720 :             *started_tx = true;
                               1586                 :                :         }
                               1587                 :                : 
                               1588                 :                :         /* Fetch all non-ready tables. */
  627 michael@paquier.xyz      1589                 :            736 :         rstates = GetSubscriptionRelations(MySubscription->oid, true);
                               1590                 :                : 
                               1591                 :                :         /* Allocate the tracking info in a permanent memory context. */
 1005 akapila@postgresql.o     1592                 :            736 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
                               1593   [ +  +  +  +  :           1946 :         foreach(lc, rstates)
                                              +  + ]
                               1594                 :                :         {
                               1595                 :           1210 :             rstate = palloc(sizeof(SubscriptionRelState));
                               1596                 :           1210 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
                               1597                 :           1210 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
                               1598                 :                :         }
                               1599                 :            736 :         MemoryContextSwitchTo(oldctx);
                               1600                 :                : 
                               1601                 :                :         /*
                               1602                 :                :          * Does the subscription have tables?
                               1603                 :                :          *
                               1604                 :                :          * If there were not-READY relations found then we know it does. But
                               1605                 :                :          * if table_states_not_ready was empty we still need to check again to
                               1606                 :                :          * see if there are 0 tables.
                               1607                 :                :          */
  606 tgl@sss.pgh.pa.us        1608   [ +  +  +  + ]:            941 :         has_subrels = (table_states_not_ready != NIL) ||
 1005 akapila@postgresql.o     1609                 :            205 :             HasSubscriptionRelations(MySubscription->oid);
                               1610                 :                : 
                               1611                 :            736 :         table_states_valid = true;
                               1612                 :                :     }
                               1613                 :                : 
                               1614                 :           4456 :     return has_subrels;
                               1615                 :                : }
                               1616                 :                : 
                               1617                 :                : /*
                               1618                 :                :  * Execute the initial sync with error handling. Disable the subscription,
                               1619                 :                :  * if it's required.
                               1620                 :                :  *
                               1621                 :                :  * Allocate the slot name in long-lived context on return. Note that we don't
                               1622                 :                :  * handle FATAL errors which are probably because of system resource error and
                               1623                 :                :  * are not repeatable.
                               1624                 :                :  */
                               1625                 :                : static void
  255 akapila@postgresql.o     1626                 :GNC         169 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
                               1627                 :                : {
                               1628                 :            169 :     char       *sync_slotname = NULL;
                               1629                 :                : 
                               1630         [ -  + ]:            169 :     Assert(am_tablesync_worker());
                               1631                 :                : 
                               1632         [ +  + ]:            169 :     PG_TRY();
                               1633                 :                :     {
                               1634                 :                :         /* Call initial sync. */
                               1635                 :            169 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
                               1636                 :                :     }
                               1637                 :              8 :     PG_CATCH();
                               1638                 :                :     {
                               1639         [ +  + ]:              8 :         if (MySubscription->disableonerr)
                               1640                 :              1 :             DisableSubscriptionAndExit();
                               1641                 :                :         else
                               1642                 :                :         {
                               1643                 :                :             /*
                               1644                 :                :              * Report the worker failed during table synchronization. Abort
                               1645                 :                :              * the current transaction so that the stats message is sent in an
                               1646                 :                :              * idle state.
                               1647                 :                :              */
                               1648                 :              7 :             AbortOutOfAnyTransaction();
                               1649                 :              7 :             pgstat_report_subscription_error(MySubscription->oid, false);
                               1650                 :                : 
                               1651                 :              7 :             PG_RE_THROW();
                               1652                 :                :         }
                               1653                 :                :     }
                               1654         [ -  + ]:            160 :     PG_END_TRY();
                               1655                 :                : 
                               1656                 :                :     /* allocate slot name in long-lived context */
                               1657                 :            160 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
                               1658                 :            160 :     pfree(sync_slotname);
                               1659                 :            160 : }
                               1660                 :                : 
                               1661                 :                : /*
                               1662                 :                :  * Runs the tablesync worker.
                               1663                 :                :  *
                               1664                 :                :  * It starts syncing tables. After a successful sync, sets streaming options
                               1665                 :                :  * and starts streaming to catchup with apply worker.
                               1666                 :                :  */
                               1667                 :                : static void
                               1668                 :            169 : run_tablesync_worker()
                               1669                 :                : {
                               1670                 :                :     char        originname[NAMEDATALEN];
                               1671                 :            169 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
                               1672                 :            169 :     char       *slotname = NULL;
                               1673                 :                :     WalRcvStreamOptions options;
                               1674                 :                : 
                               1675                 :            169 :     start_table_sync(&origin_startpos, &slotname);
                               1676                 :                : 
                               1677                 :            160 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1678                 :            160 :                                        MyLogicalRepWorker->relid,
                               1679                 :                :                                        originname,
                               1680                 :                :                                        sizeof(originname));
                               1681                 :                : 
                               1682                 :            160 :     set_apply_error_context_origin(originname);
                               1683                 :                : 
                               1684                 :            160 :     set_stream_options(&options, slotname, &origin_startpos);
                               1685                 :                : 
                               1686                 :            160 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
                               1687                 :                : 
                               1688                 :                :     /* Apply the changes till we catchup with the apply worker. */
                               1689                 :            160 :     start_apply(origin_startpos);
  255 akapila@postgresql.o     1690                 :UNC           0 : }
                               1691                 :                : 
                               1692                 :                : /* Logical Replication Tablesync worker entry point */
                               1693                 :                : void
  255 akapila@postgresql.o     1694                 :GNC         169 : TablesyncWorkerMain(Datum main_arg)
                               1695                 :                : {
                               1696                 :            169 :     int         worker_slot = DatumGetInt32(main_arg);
                               1697                 :                : 
                               1698                 :            169 :     SetupApplyOrSyncWorker(worker_slot);
                               1699                 :                : 
                               1700                 :            169 :     run_tablesync_worker();
                               1701                 :                : 
  255 akapila@postgresql.o     1702                 :UNC           0 :     finish_sync_worker();
                               1703                 :                : }
                               1704                 :                : 
                               1705                 :                : /*
                               1706                 :                :  * If the subscription has no tables then return false.
                               1707                 :                :  *
                               1708                 :                :  * Otherwise, are all tablesyncs READY?
                               1709                 :                :  *
                               1710                 :                :  * Note: This function is not suitable to be called from outside of apply or
                               1711                 :                :  * tablesync workers because MySubscription needs to be already initialized.
                               1712                 :                :  */
                               1713                 :                : bool
 1005 akapila@postgresql.o     1714                 :CBC          68 : AllTablesyncsReady(void)
                               1715                 :                : {
                               1716                 :             68 :     bool        started_tx = false;
                               1717                 :             68 :     bool        has_subrels = false;
                               1718                 :                : 
                               1719                 :                :     /* We need up-to-date sync state info for subscription tables here. */
                               1720                 :             68 :     has_subrels = FetchTableStates(&started_tx);
                               1721                 :                : 
                               1722         [ +  + ]:             68 :     if (started_tx)
                               1723                 :                :     {
                               1724                 :             13 :         CommitTransactionCommand();
  739 andres@anarazel.de       1725                 :             13 :         pgstat_report_stat(true);
                               1726                 :                :     }
                               1727                 :                : 
                               1728                 :                :     /*
                               1729                 :                :      * Return false when there are no tables in subscription or not all tables
                               1730                 :                :      * are in ready state; true otherwise.
                               1731                 :                :      */
  606 tgl@sss.pgh.pa.us        1732   [ +  -  +  + ]:             68 :     return has_subrels && (table_states_not_ready == NIL);
                               1733                 :                : }
                               1734                 :                : 
                               1735                 :                : /*
                               1736                 :                :  * Update the two_phase state of the specified subscription in pg_subscription.
                               1737                 :                :  */
                               1738                 :                : void
 1005 akapila@postgresql.o     1739                 :              7 : UpdateTwoPhaseState(Oid suboid, char new_state)
                               1740                 :                : {
                               1741                 :                :     Relation    rel;
                               1742                 :                :     HeapTuple   tup;
                               1743                 :                :     bool        nulls[Natts_pg_subscription];
                               1744                 :                :     bool        replaces[Natts_pg_subscription];
                               1745                 :                :     Datum       values[Natts_pg_subscription];
                               1746                 :                : 
                               1747   [ +  -  +  -  :              7 :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
                                              -  + ]
                               1748                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
                               1749                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
                               1750                 :                : 
                               1751                 :              7 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
                               1752                 :              7 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
                               1753         [ -  + ]:              7 :     if (!HeapTupleIsValid(tup))
 1005 akapila@postgresql.o     1754         [ #  # ]:UBC           0 :         elog(ERROR,
                               1755                 :                :              "cache lookup failed for subscription oid %u",
                               1756                 :                :              suboid);
                               1757                 :                : 
                               1758                 :                :     /* Form a new tuple. */
 1005 akapila@postgresql.o     1759                 :CBC           7 :     memset(values, 0, sizeof(values));
                               1760                 :              7 :     memset(nulls, false, sizeof(nulls));
                               1761                 :              7 :     memset(replaces, false, sizeof(replaces));
                               1762                 :                : 
                               1763                 :                :     /* And update/set two_phase state */
                               1764                 :              7 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
                               1765                 :              7 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
                               1766                 :                : 
                               1767                 :              7 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
                               1768                 :                :                             values, nulls, replaces);
                               1769                 :              7 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               1770                 :                : 
                               1771                 :              7 :     heap_freetuple(tup);
                               1772                 :              7 :     table_close(rel, RowExclusiveLock);
                               1773                 :              7 : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622