Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/walreceiver.h"
110 : #include "replication/worker_internal.h"
111 : #include "replication/slot.h"
112 : #include "replication/origin.h"
113 : #include "storage/ipc.h"
114 : #include "storage/lmgr.h"
115 : #include "utils/acl.h"
116 : #include "utils/array.h"
117 : #include "utils/builtins.h"
118 : #include "utils/lsyscache.h"
119 : #include "utils/memutils.h"
120 : #include "utils/rls.h"
121 : #include "utils/snapmgr.h"
122 : #include "utils/syscache.h"
123 :
124 : static bool table_states_valid = false;
125 : static List *table_states_not_ready = NIL;
126 : static bool FetchTableStates(bool *started_tx);
127 :
128 : static StringInfo copybuf = NULL;
129 :
130 : /*
131 : * Exit routine for synchronization worker.
132 : */
133 : static void
134 : pg_attribute_noreturn()
2208 peter_e 135 GIC 148 : finish_sync_worker(void)
2208 peter_e 136 ECB : {
137 : /*
138 : * Commit any outstanding transaction. This is the usual case, unless
139 : * there was nothing to do for the table.
140 : */
2208 peter_e 141 GIC 148 : if (IsTransactionState())
2186 peter_e 142 ECB : {
2208 peter_e 143 GIC 148 : CommitTransactionCommand();
368 andres 144 CBC 148 : pgstat_report_stat(true);
2186 peter_e 145 ECB : }
146 :
147 : /* And flush all writes. */
2208 peter_e 148 GIC 148 : XLogFlush(GetXLogWriteRecPtr());
2208 peter_e 149 ECB :
2146 peter_e 150 GIC 148 : StartTransactionCommand();
2208 peter_e 151 CBC 148 : ereport(LOG,
2146 peter_e 152 ECB : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
153 : MySubscription->name,
154 : get_rel_name(MyLogicalRepWorker->relid))));
2146 peter_e 155 GIC 148 : CommitTransactionCommand();
2208 peter_e 156 ECB :
157 : /* Find the leader apply worker and signal it. */
2133 peter_e 158 GIC 148 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
2133 peter_e 159 ECB :
160 : /* Stop gracefully */
2208 peter_e 161 GIC 148 : proc_exit(0);
2208 peter_e 162 ECB : }
163 :
164 : /*
165 : * Wait until the relation sync state is set in the catalog to the expected
166 : * one; return true when it happens.
167 : *
168 : * Returns false if the table sync worker or the table itself have
169 : * disappeared, or the table state has been reset.
170 : *
171 : * Currently, this is used in the apply worker when transitioning from
172 : * CATCHUP state to SYNCDONE.
173 : */
174 : static bool
2133 peter_e 175 GIC 145 : wait_for_relation_state_change(Oid relid, char expected_state)
2208 peter_e 176 ECB : {
177 : char state;
178 :
179 : for (;;)
2208 peter_e 180 GIC 147 : {
2153 bruce 181 ECB : LogicalRepWorker *worker;
182 : XLogRecPtr statelsn;
183 :
2137 peter_e 184 GIC 292 : CHECK_FOR_INTERRUPTS();
2137 peter_e 185 ECB :
906 alvherre 186 GIC 292 : InvalidateCatalogSnapshot();
2133 peter_e 187 CBC 292 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
906 alvherre 188 ECB : relid, &statelsn);
189 :
2133 peter_e 190 GIC 292 : if (state == SUBREL_STATE_UNKNOWN)
906 alvherre 191 LBC 0 : break;
2133 peter_e 192 EUB :
2133 peter_e 193 GIC 292 : if (state == expected_state)
2133 peter_e 194 CBC 145 : return true;
2133 peter_e 195 ECB :
196 : /* Check if the sync worker is still running and bail if not. */
2208 peter_e 197 GIC 147 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
906 alvherre 198 CBC 147 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
2136 peter_e 199 ECB : false);
2133 peter_e 200 GIC 147 : LWLockRelease(LogicalRepWorkerLock);
2208 peter_e 201 CBC 147 : if (!worker)
906 alvherre 202 LBC 0 : break;
2136 peter_e 203 EUB :
1598 tmunro 204 GIC 147 : (void) WaitLatch(MyLatch,
1598 tmunro 205 ECB : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
206 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
207 :
2133 andres 208 GIC 147 : ResetLatch(MyLatch);
2133 peter_e 209 ECB : }
210 :
2133 peter_e 211 UIC 0 : return false;
2133 peter_e 212 EUB : }
213 :
214 : /*
215 : * Wait until the apply worker changes the state of our synchronization
216 : * worker to the expected one.
217 : *
218 : * Used when transitioning from SYNCWAIT state to CATCHUP.
219 : *
220 : * Returns false if the apply worker has disappeared.
221 : */
222 : static bool
2133 peter_e 223 GIC 149 : wait_for_worker_state_change(char expected_state)
2133 peter_e 224 ECB : {
225 : int rc;
226 :
227 : for (;;)
2133 peter_e 228 GIC 149 : {
2133 peter_e 229 ECB : LogicalRepWorker *worker;
230 :
2133 peter_e 231 GIC 298 : CHECK_FOR_INTERRUPTS();
2133 peter_e 232 ECB :
233 : /*
234 : * Done if already in correct state. (We assume this fetch is atomic
235 : * enough to not give a misleading answer if we do it with no lock.)
236 : */
2109 tgl 237 GIC 298 : if (MyLogicalRepWorker->relstate == expected_state)
2109 tgl 238 CBC 149 : return true;
2109 tgl 239 ECB :
240 : /*
241 : * Bail out if the apply worker has died, else signal it we're
242 : * waiting.
243 : */
2133 peter_e 244 GIC 149 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
2133 peter_e 245 CBC 149 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
2133 peter_e 246 ECB : InvalidOid, false);
2109 tgl 247 GIC 149 : if (worker && worker->proc)
2109 tgl 248 CBC 149 : logicalrep_worker_wakeup_ptr(worker);
2133 peter_e 249 149 : LWLockRelease(LogicalRepWorkerLock);
250 149 : if (!worker)
2109 tgl 251 LBC 0 : break;
2208 peter_e 252 EUB :
253 : /*
254 : * Wait. We expect to get a latch signal back from the apply worker,
255 : * but use a timeout in case it dies without sending one.
256 : */
2133 andres 257 GIC 149 : rc = WaitLatch(MyLatch,
1598 tmunro 258 ECB : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
259 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
260 :
2109 tgl 261 GIC 149 : if (rc & WL_LATCH_SET)
2109 tgl 262 CBC 149 : ResetLatch(MyLatch);
2208 peter_e 263 ECB : }
264 :
2208 peter_e 265 UIC 0 : return false;
2208 peter_e 266 EUB : }
267 :
268 : /*
269 : * Callback from syscache invalidation.
270 : */
271 : void
2208 peter_e 272 GIC 1038 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
2208 peter_e 273 ECB : {
2208 peter_e 274 GIC 1038 : table_states_valid = false;
2208 peter_e 275 CBC 1038 : }
2208 peter_e 276 ECB :
277 : /*
278 : * Handle table synchronization cooperation from the synchronization
279 : * worker.
280 : *
281 : * If the sync worker is in CATCHUP state and reached (or passed) the
282 : * predetermined synchronization point in the WAL stream, mark the table as
283 : * SYNCDONE and finish.
284 : */
285 : static void
2208 peter_e 286 GIC 170 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
2208 peter_e 287 ECB : {
2208 peter_e 288 GIC 170 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
2208 peter_e 289 ECB :
2208 peter_e 290 GIC 170 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
2208 peter_e 291 CBC 170 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
2208 peter_e 292 ECB : {
293 : TimeLineID tli;
786 akapila 294 GIC 149 : char syncslotname[NAMEDATALEN] = {0};
222 akapila 295 GNC 149 : char originname[NAMEDATALEN] = {0};
2208 peter_e 296 ECB :
2133 peter_e 297 CBC 149 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
2208 peter_e 298 GIC 149 : MyLogicalRepWorker->relstate_lsn = current_lsn;
2208 peter_e 299 ECB :
2208 peter_e 300 CBC 149 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
301 :
786 akapila 302 ECB : /*
303 : * UpdateSubscriptionRelState must be called within a transaction.
304 : */
786 akapila 305 GIC 149 : if (!IsTransactionState())
786 akapila 306 CBC 149 : StartTransactionCommand();
786 akapila 307 ECB :
1829 peter_e 308 GIC 149 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1829 peter_e 309 CBC 149 : MyLogicalRepWorker->relid,
310 149 : MyLogicalRepWorker->relstate,
311 149 : MyLogicalRepWorker->relstate_lsn);
2208 peter_e 312 ECB :
313 : /*
314 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
315 : * the slot.
316 : */
697 alvherre 317 GIC 149 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
786 akapila 318 ECB :
319 : /*
320 : * Cleanup the tablesync slot.
321 : *
322 : * This has to be done after updating the state because otherwise if
323 : * there is an error while doing the database operations we won't be
324 : * able to rollback dropped slot.
325 : */
786 akapila 326 GIC 148 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
786 akapila 327 CBC 148 : MyLogicalRepWorker->relid,
783 akapila 328 ECB : syncslotname,
329 : sizeof(syncslotname));
330 :
331 : /*
332 : * It is important to give an error if we are unable to drop the slot,
333 : * otherwise, it won't be dropped till the corresponding subscription
334 : * is dropped. So passing missing_ok = false.
335 : */
697 alvherre 336 GIC 148 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
786 akapila 337 ECB :
209 akapila 338 GNC 148 : CommitTransactionCommand();
339 148 : pgstat_report_stat(false);
340 :
341 : /*
342 : * Start a new transaction to clean up the tablesync origin tracking.
343 : * This transaction will be ended within the finish_sync_worker().
344 : * Now, even, if we fail to remove this here, the apply worker will
345 : * ensure to clean it up afterward.
346 : *
347 : * We need to do this after the table state is set to SYNCDONE.
348 : * Otherwise, if an error occurs while performing the database
349 : * operation, the worker will be restarted and the in-memory state of
350 : * replication progress (remote_lsn) won't be rolled-back which would
351 : * have been cleared before restart. So, the restarted worker will use
352 : * invalid replication progress state resulting in replay of
353 : * transactions that have already been applied.
354 : */
355 148 : StartTransactionCommand();
356 :
180 357 148 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
358 148 : MyLogicalRepWorker->relid,
359 : originname,
360 : sizeof(originname));
361 :
362 : /*
363 : * Resetting the origin session removes the ownership of the slot.
364 : * This is needed to allow the origin to be dropped.
365 : */
209 366 148 : replorigin_session_reset();
367 148 : replorigin_session_origin = InvalidRepOriginId;
368 148 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
369 148 : replorigin_session_origin_timestamp = 0;
370 :
371 : /*
372 : * Drop the tablesync's origin tracking if exists.
373 : *
374 : * There is a chance that the user is concurrently performing refresh
375 : * for the subscription where we remove the table state and its origin
376 : * or the apply worker would have removed this origin. So passing
377 : * missing_ok = true.
378 : */
379 148 : replorigin_drop_by_name(originname, true, false);
380 :
2208 peter_e 381 GIC 148 : finish_sync_worker();
2208 peter_e 382 ECB : }
383 : else
2208 peter_e 384 GIC 21 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
385 21 : }
386 :
387 : /*
388 : * Handle table synchronization cooperation from the apply worker.
389 : *
390 : * Walk over all subscription tables that are individually tracked by the
391 : * apply process (currently, all that have state other than
392 : * SUBREL_STATE_READY) and manage synchronization for them.
393 : *
394 : * If there are tables that need synchronizing and are not being synchronized
395 : * yet, start sync workers for them (if there are free slots for sync
396 : * workers). To prevent starting the sync worker for the same relation at a
397 : * high frequency after a failure, we store its last start time with each sync
398 : * state info. We start the sync worker for the same relation after waiting
2173 peter_e 399 ECB : * at least wal_retrieve_retry_interval.
400 : *
2208 401 : * For tables that are being synchronized already, check if sync workers
2133 402 : * either need action from the apply worker or have finished. This is the
403 : * SYNCWAIT to CATCHUP transition.
404 : *
405 : * If the synchronization position is reached (SYNCDONE), then the table can
406 : * be marked as READY and is no longer tracked.
407 : */
408 : static void
2208 peter_e 409 GIC 2476 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
2208 peter_e 410 ECB : {
2173 411 : struct tablesync_start_time_mapping
412 : {
413 : Oid relid;
414 : TimestampTz last_start_time;
415 : };
416 : static HTAB *last_start_times = NULL;
417 : ListCell *lc;
2162 peter_e 418 GIC 2476 : bool started_tx = false;
93 tgl 419 GNC 2476 : bool should_exit = false;
420 :
2208 peter_e 421 GIC 2476 : Assert(!IsTransactionState());
422 :
423 : /* We need up-to-date sync state info for subscription tables here. */
634 akapila 424 CBC 2476 : FetchTableStates(&started_tx);
425 :
2173 peter_e 426 ECB : /*
427 : * Prepare a hash table for tracking last start times of workers, to avoid
428 : * immediate restarts. We don't need it if there are no tables that need
429 : * syncing.
430 : */
235 tgl 431 GNC 2476 : if (table_states_not_ready != NIL && !last_start_times)
2173 peter_e 432 GIC 88 : {
433 : HASHCTL ctl;
434 :
435 88 : ctl.keysize = sizeof(Oid);
436 88 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
437 88 : last_start_times = hash_create("Logical replication table sync worker start times",
438 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
439 : }
440 :
441 : /*
442 : * Clean up the hash table when we're done with all tables (just to
443 : * release the bit of memory).
444 : */
235 tgl 445 GNC 2388 : else if (table_states_not_ready == NIL && last_start_times)
446 : {
2173 peter_e 447 GIC 68 : hash_destroy(last_start_times);
448 68 : last_start_times = NULL;
449 : }
450 :
451 : /*
452 : * Process all tables that are being synchronized.
453 : */
634 akapila 454 CBC 3865 : foreach(lc, table_states_not_ready)
2208 peter_e 455 ECB : {
2153 bruce 456 GIC 1389 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
457 :
2208 peter_e 458 CBC 1389 : if (rstate->state == SUBREL_STATE_SYNCDONE)
2208 peter_e 459 ECB : {
460 : /*
461 : * Apply has caught up to the position where the table sync has
462 : * finished. Mark the table as ready so that the apply will just
463 : * continue to replicate it normally.
464 : */
2208 peter_e 465 GIC 144 : if (current_lsn >= rstate->lsn)
466 : {
467 : char originname[NAMEDATALEN];
209 akapila 468 ECB :
2208 peter_e 469 GIC 144 : rstate->state = SUBREL_STATE_READY;
2208 peter_e 470 CBC 144 : rstate->lsn = current_lsn;
2162 471 144 : if (!started_tx)
472 : {
2162 peter_e 473 GIC 2 : StartTransactionCommand();
474 2 : started_tx = true;
475 : }
476 :
786 akapila 477 ECB : /*
478 : * Remove the tablesync origin tracking if exists.
209 479 : *
480 : * There is a chance that the user is concurrently performing
481 : * refresh for the subscription where we remove the table
482 : * state and its origin or the tablesync worker would have
483 : * already removed this origin. We can't rely on tablesync
484 : * worker to remove the origin tracking as if there is any
485 : * error while dropping we won't restart it to drop the
486 : * origin. So passing missing_ok = true.
487 : */
180 akapila 488 GNC 144 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
489 : rstate->relid,
490 : originname,
491 : sizeof(originname));
209 akapila 492 CBC 144 : replorigin_drop_by_name(originname, true, false);
493 :
209 akapila 494 ECB : /*
495 : * Update the state to READY only after the origin cleanup.
496 : */
1829 peter_e 497 GIC 144 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
498 144 : rstate->relid, rstate->state,
499 : rstate->lsn);
500 : }
501 : }
502 : else
503 : {
504 : LogicalRepWorker *syncworker;
505 :
506 : /*
507 : * Look for a sync worker for this relation.
508 : */
2208 peter_e 509 CBC 1245 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
510 :
2208 peter_e 511 GIC 1245 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
512 : rstate->relid, false);
2109 tgl 513 ECB :
2208 peter_e 514 GIC 1245 : if (syncworker)
515 : {
516 : /* Found one, update our copy of its state */
517 542 : SpinLockAcquire(&syncworker->relmutex);
2208 peter_e 518 CBC 542 : rstate->state = syncworker->relstate;
519 542 : rstate->lsn = syncworker->relstate_lsn;
2109 tgl 520 GIC 542 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
521 : {
522 : /*
523 : * Sync worker is waiting for apply. Tell sync worker it
524 : * can catchup now.
525 : */
526 145 : syncworker->relstate = SUBREL_STATE_CATCHUP;
527 145 : syncworker->relstate_lsn =
528 145 : Max(syncworker->relstate_lsn, current_lsn);
529 : }
2208 peter_e 530 CBC 542 : SpinLockRelease(&syncworker->relmutex);
531 :
2109 tgl 532 ECB : /* If we told worker to catch up, wait for it. */
2109 tgl 533 GIC 542 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
534 : {
2109 tgl 535 ECB : /* Signal the sync worker, as it may be waiting for us. */
2109 tgl 536 GIC 145 : if (syncworker->proc)
537 145 : logicalrep_worker_wakeup_ptr(syncworker);
2109 tgl 538 ECB :
539 : /* Now safe to release the LWLock */
2109 tgl 540 CBC 145 : LWLockRelease(LogicalRepWorkerLock);
2109 tgl 541 ECB :
542 : /*
543 : * Enter busy loop and wait for synchronization worker to
544 : * reach expected state (or die trying).
545 : */
2109 tgl 546 GIC 145 : if (!started_tx)
2109 tgl 547 ECB : {
2109 tgl 548 LBC 0 : StartTransactionCommand();
549 0 : started_tx = true;
550 : }
2109 tgl 551 ECB :
2109 tgl 552 GIC 145 : wait_for_relation_state_change(rstate->relid,
553 : SUBREL_STATE_SYNCDONE);
2109 tgl 554 ECB : }
555 : else
2109 tgl 556 GIC 397 : LWLockRelease(LogicalRepWorkerLock);
2208 peter_e 557 ECB : }
558 : else
559 : {
560 : /*
2126 tgl 561 : * If there is no sync worker for this table yet, count
562 : * running sync workers for this subscription, while we have
563 : * the lock.
564 : */
565 : int nsyncworkers =
2109 tgl 566 GIC 703 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
2208 peter_e 567 ECB :
568 : /* Now safe to release the LWLock */
2109 tgl 569 GBC 703 : LWLockRelease(LogicalRepWorkerLock);
2208 peter_e 570 EUB :
571 : /*
572 : * If there are free sync worker slot(s), start a new sync
2109 tgl 573 ECB : * worker for the table.
574 : */
2109 tgl 575 GIC 703 : if (nsyncworkers < max_sync_workers_per_subscription)
576 : {
2109 tgl 577 CBC 159 : TimestampTz now = GetCurrentTimestamp();
578 : struct tablesync_start_time_mapping *hentry;
579 : bool found;
580 :
2109 tgl 581 GIC 159 : hentry = hash_search(last_start_times, &rstate->relid,
582 : HASH_ENTER, &found);
583 :
584 170 : if (!found ||
585 11 : TimestampDifferenceExceeds(hentry->last_start_time, now,
586 : wal_retrieve_retry_interval))
2109 tgl 587 ECB : {
2109 tgl 588 GIC 153 : logicalrep_worker_launch(MyLogicalRepWorker->dbid,
589 153 : MySubscription->oid,
2109 tgl 590 CBC 153 : MySubscription->name,
2109 tgl 591 GIC 153 : MyLogicalRepWorker->userid,
592 : rstate->relid,
593 : DSM_HANDLE_INVALID);
594 153 : hentry->last_start_time = now;
595 : }
596 : }
2208 peter_e 597 ECB : }
598 : }
599 : }
600 :
2162 peter_e 601 GIC 2476 : if (started_tx)
602 : {
603 : /*
604 : * Even when the two_phase mode is requested by the user, it remains
605 : * as 'pending' until all tablesyncs have reached READY state.
606 : *
607 : * When this happens, we restart the apply worker and (if the
608 : * conditions are still ok) then the two_phase tri-state will become
609 : * 'enabled' at that time.
610 : *
611 : * Note: If the subscription has no tables then leave the state as
612 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
613 : * work.
614 : */
93 tgl 615 GNC 644 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
616 : {
617 22 : CommandCounterIncrement(); /* make updates visible */
618 22 : if (AllTablesyncsReady())
619 : {
620 5 : ereport(LOG,
621 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
622 : MySubscription->name)));
623 5 : should_exit = true;
624 : }
625 : }
626 :
2162 peter_e 627 CBC 644 : CommitTransactionCommand();
368 andres 628 GIC 644 : pgstat_report_stat(true);
629 : }
630 :
93 tgl 631 GNC 2476 : if (should_exit)
632 : {
633 : /*
634 : * Reset the last-start time for this worker so that the launcher will
635 : * restart it without waiting for wal_retrieve_retry_interval.
636 : */
77 637 5 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
638 :
93 639 5 : proc_exit(0);
640 : }
2208 peter_e 641 CBC 2471 : }
2208 peter_e 642 ECB :
643 : /*
644 : * Process possible state change(s) of tables that are being synchronized.
645 : */
646 : void
2208 peter_e 647 CBC 2668 : process_syncing_tables(XLogRecPtr current_lsn)
2208 peter_e 648 ECB : {
649 : /*
650 : * Skip for parallel apply workers because they only operate on tables
651 : * that are in a READY state. See pa_can_start() and
652 : * should_apply_changes_for_rel().
653 : */
90 akapila 654 GNC 2668 : if (am_parallel_apply_worker())
655 22 : return;
656 :
2208 peter_e 657 GIC 2646 : if (am_tablesync_worker())
658 170 : process_syncing_tables_for_sync(current_lsn);
2208 peter_e 659 ECB : else
2208 peter_e 660 GIC 2476 : process_syncing_tables_for_apply(current_lsn);
661 : }
662 :
663 : /*
664 : * Create list of columns for COPY based on logical relation mapping.
665 : */
2208 peter_e 666 ECB : static List *
2208 peter_e 667 GIC 155 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
668 : {
669 155 : List *attnamelist = NIL;
670 : int i;
671 :
2152 672 410 : for (i = 0; i < rel->remoterel.natts; i++)
673 : {
2208 674 255 : attnamelist = lappend(attnamelist,
2152 675 255 : makeString(rel->remoterel.attnames[i]));
676 : }
677 :
678 :
2208 679 155 : return attnamelist;
2208 peter_e 680 ECB : }
681 :
682 : /*
683 : * Data source callback for the COPY FROM, which reads from the remote
684 : * connection and passes the data back to our local COPY.
685 : */
686 : static int
2208 peter_e 687 GIC 13865 : copy_read_data(void *outbuf, int minread, int maxread)
2208 peter_e 688 ECB : {
2153 bruce 689 GIC 13865 : int bytesread = 0;
690 : int avail;
691 :
2126 peter_e 692 ECB : /* If there are some leftover data from previous read, use it. */
2208 peter_e 693 CBC 13865 : avail = copybuf->len - copybuf->cursor;
2208 peter_e 694 GIC 13865 : if (avail)
695 : {
2208 peter_e 696 LBC 0 : if (avail > maxread)
2208 peter_e 697 UIC 0 : avail = maxread;
698 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
699 0 : copybuf->cursor += avail;
700 0 : maxread -= avail;
701 0 : bytesread += avail;
2208 peter_e 702 ECB : }
703 :
2137 peter_e 704 CBC 13866 : while (maxread > 0 && bytesread < minread)
705 : {
2208 706 13866 : pgsocket fd = PGINVALID_SOCKET;
707 : int len;
2208 peter_e 708 GIC 13866 : char *buf = NULL;
709 :
710 : for (;;)
711 : {
2208 peter_e 712 ECB : /* Try read the data. */
697 alvherre 713 GIC 13866 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
714 :
2208 peter_e 715 13866 : CHECK_FOR_INTERRUPTS();
716 :
717 13866 : if (len == 0)
718 1 : break;
2208 peter_e 719 CBC 13865 : else if (len < 0)
720 13865 : return bytesread;
721 : else
2208 peter_e 722 ECB : {
723 : /* Process the data */
2208 peter_e 724 GIC 13712 : copybuf->data = buf;
2208 peter_e 725 CBC 13712 : copybuf->len = len;
2208 peter_e 726 GIC 13712 : copybuf->cursor = 0;
727 :
728 13712 : avail = copybuf->len - copybuf->cursor;
729 13712 : if (avail > maxread)
2208 peter_e 730 UIC 0 : avail = maxread;
2208 peter_e 731 GIC 13712 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
2208 peter_e 732 CBC 13712 : outbuf = (void *) ((char *) outbuf + avail);
2208 peter_e 733 GIC 13712 : copybuf->cursor += avail;
2208 peter_e 734 CBC 13712 : maxread -= avail;
2208 peter_e 735 GIC 13712 : bytesread += avail;
736 : }
2208 peter_e 737 ECB :
2208 peter_e 738 GIC 13712 : if (maxread <= 0 || bytesread >= minread)
2208 peter_e 739 CBC 13712 : return bytesread;
2208 peter_e 740 ECB : }
741 :
742 : /*
743 : * Wait for more data or latch.
744 : */
1598 tmunro 745 GIC 1 : (void) WaitLatchOrSocket(MyLatch,
746 : WL_SOCKET_READABLE | WL_LATCH_SET |
747 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
748 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
749 :
2133 andres 750 1 : ResetLatch(MyLatch);
751 : }
2208 peter_e 752 ECB :
2208 peter_e 753 UIC 0 : return bytesread;
2208 peter_e 754 ECB : }
755 :
756 :
757 : /*
758 : * Get information about remote relation in similar fashion the RELATION
411 akapila 759 : * message provides during replication. This function also returns the relation
760 : * qualifications to be used in the COPY command.
2208 peter_e 761 EUB : */
762 : static void
2208 peter_e 763 GBC 155 : fetch_remote_table_info(char *nspname, char *relname,
411 akapila 764 EUB : LogicalRepRelation *lrel, List **qual)
2208 peter_e 765 : {
2153 bruce 766 : WalRcvExecResult *res;
767 : StringInfoData cmd;
768 : TupleTableSlot *slot;
1116 peter 769 CBC 155 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
379 tomas.vondra 770 GIC 155 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
411 akapila 771 CBC 155 : Oid qualRow[] = {TEXTOID};
772 : bool isnull;
2153 bruce 773 ECB : int natt;
774 : ListCell *lc;
379 tomas.vondra 775 GIC 155 : Bitmapset *included_cols = NULL;
776 :
2208 peter_e 777 155 : lrel->nspname = nspname;
2208 peter_e 778 CBC 155 : lrel->relname = relname;
779 :
2208 peter_e 780 ECB : /* First fetch Oid and replica identity. */
2208 peter_e 781 GIC 155 : initStringInfo(&cmd);
1116 peter 782 CBC 155 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
2153 bruce 783 ECB : " FROM pg_catalog.pg_class c"
784 : " INNER JOIN pg_catalog.pg_namespace n"
785 : " ON (c.relnamespace = n.oid)"
786 : " WHERE n.nspname = %s"
787 : " AND c.relname = %s",
788 : quote_literal_cstr(nspname),
789 : quote_literal_cstr(relname));
697 alvherre 790 CBC 155 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
697 alvherre 791 ECB : lengthof(tableRow), tableRow);
792 :
2208 peter_e 793 CBC 155 : if (res->status != WALRCV_OK_TUPLES)
2208 peter_e 794 LBC 0 : ereport(ERROR,
662 tgl 795 EUB : (errcode(ERRCODE_CONNECTION_FAILURE),
662 tgl 796 ECB : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
2208 peter_e 797 : nspname, relname, res->err)));
798 :
1606 andres 799 CBC 155 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2208 peter_e 800 155 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2208 peter_e 801 UIC 0 : ereport(ERROR,
802 : (errcode(ERRCODE_UNDEFINED_OBJECT),
662 tgl 803 ECB : errmsg("table \"%s.%s\" not found on publisher",
2208 peter_e 804 : nspname, relname)));
805 :
2208 peter_e 806 GIC 155 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
807 155 : Assert(!isnull);
808 155 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
809 155 : Assert(!isnull);
1116 peter 810 CBC 155 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
1116 peter 811 GIC 155 : Assert(!isnull);
812 :
2208 peter_e 813 155 : ExecDropSingleTupleTableSlot(slot);
814 155 : walrcv_clear_result(res);
2208 peter_e 815 ECB :
816 :
817 : /*
379 tomas.vondra 818 EUB : * Get column lists for each relation.
819 : *
820 : * We need to do this before fetching info about column names and types,
821 : * so that we can skip columns that should not be replicated.
822 : */
379 tomas.vondra 823 GIC 155 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
824 : {
825 : WalRcvExecResult *pubres;
826 : TupleTableSlot *tslot;
311 akapila 827 155 : Oid attrsRow[] = {INT2VECTOROID};
379 tomas.vondra 828 ECB : StringInfoData pub_names;
829 :
379 tomas.vondra 830 GIC 155 : initStringInfo(&pub_names);
831 495 : foreach(lc, MySubscription->publications)
832 : {
232 drowley 833 340 : if (foreach_current_index(lc) > 0)
215 drowley 834 GNC 185 : appendStringInfoString(&pub_names, ", ");
379 tomas.vondra 835 CBC 340 : appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
379 tomas.vondra 836 ECB : }
837 :
838 : /*
839 : * Fetch info about column lists for the relation (from all the
840 : * publications).
841 : */
379 tomas.vondra 842 GIC 155 : resetStringInfo(&cmd);
379 tomas.vondra 843 CBC 155 : appendStringInfo(&cmd,
311 akapila 844 ECB : "SELECT DISTINCT"
845 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
846 : " THEN NULL ELSE gpt.attrs END)"
325 847 : " FROM pg_publication p,"
311 848 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
849 : " pg_class c"
850 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
851 : " AND p.pubname IN ( %s )",
852 : lrel->remoteid,
853 : pub_names.data);
854 :
379 tomas.vondra 855 GIC 155 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
379 tomas.vondra 856 ECB : lengthof(attrsRow), attrsRow);
857 :
379 tomas.vondra 858 GIC 155 : if (pubres->status != WALRCV_OK_TUPLES)
379 tomas.vondra 859 LBC 0 : ereport(ERROR,
379 tomas.vondra 860 EUB : (errcode(ERRCODE_CONNECTION_FAILURE),
861 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
862 : nspname, relname, pubres->err)));
863 :
864 : /*
311 akapila 865 ECB : * We don't support the case where the column list is different for
866 : * the same table when combining publications. See comments atop
311 akapila 867 EUB : * fetch_table_list. So there should be only one row returned.
868 : * Although we already checked this when creating the subscription, we
869 : * still need to check here in case the column list was changed after
870 : * creating the subscription and before the sync worker is started.
871 : */
311 akapila 872 CBC 155 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
311 akapila 873 LBC 0 : ereport(ERROR,
311 akapila 874 ECB : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
875 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
876 : nspname, relname));
877 :
878 : /*
879 : * Get the column list and build a single bitmap with the attnums.
880 : *
881 : * If we find a NULL value, it means all the columns should be
882 : * replicated.
883 : */
232 drowley 884 GIC 155 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
885 155 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
886 : {
887 155 : Datum cfval = slot_getattr(tslot, 1, &isnull);
888 :
311 akapila 889 CBC 155 : if (!isnull)
890 : {
891 : ArrayType *arr;
892 : int nelems;
311 akapila 893 ECB : int16 *elems;
894 :
311 akapila 895 GIC 19 : arr = DatumGetArrayTypeP(cfval);
311 akapila 896 CBC 19 : nelems = ARR_DIMS(arr)[0];
897 19 : elems = (int16 *) ARR_DATA_PTR(arr);
898 :
899 53 : for (natt = 0; natt < nelems; natt++)
900 34 : included_cols = bms_add_member(included_cols, elems[natt]);
311 akapila 901 ECB : }
902 :
232 drowley 903 GIC 155 : ExecClearTuple(tslot);
904 : }
905 155 : ExecDropSingleTupleTableSlot(tslot);
906 :
379 tomas.vondra 907 155 : walrcv_clear_result(pubres);
379 tomas.vondra 908 ECB :
379 tomas.vondra 909 CBC 155 : pfree(pub_names.data);
910 : }
911 :
912 : /*
913 : * Now fetch column names and types.
914 : */
2208 peter_e 915 GIC 155 : resetStringInfo(&cmd);
916 155 : appendStringInfo(&cmd,
917 : "SELECT a.attnum,"
918 : " a.attname,"
919 : " a.atttypid,"
920 : " a.attnum = ANY(i.indkey)"
2208 peter_e 921 ECB : " FROM pg_catalog.pg_attribute a"
922 : " LEFT JOIN pg_catalog.pg_index i"
923 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
924 : " WHERE a.attnum > 0::pg_catalog.int2"
1471 peter 925 EUB : " AND NOT a.attisdropped %s"
926 : " AND a.attrelid = %u"
927 : " ORDER BY a.attnum",
928 : lrel->remoteid,
697 alvherre 929 GIC 155 : (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
930 : "AND a.attgenerated = ''" : ""),
931 : lrel->remoteid);
932 155 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
933 : lengthof(attrRow), attrRow);
934 :
2208 peter_e 935 155 : if (res->status != WALRCV_OK_TUPLES)
2208 peter_e 936 UIC 0 : ereport(ERROR,
937 : (errcode(ERRCODE_CONNECTION_FAILURE),
662 tgl 938 ECB : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
2208 peter_e 939 EUB : nspname, relname, res->err)));
940 :
941 : /* We don't know the number of rows coming, so allocate enough space. */
2208 peter_e 942 GIC 155 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
943 155 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
944 155 : lrel->attkeys = NULL;
945 :
946 : /*
947 : * Store the columns as a list of names. Ignore those that are not
948 : * present in the column list, if there is one.
949 : */
2208 peter_e 950 CBC 155 : natt = 0;
1606 andres 951 155 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2208 peter_e 952 GIC 432 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2208 peter_e 953 ECB : {
954 : char *rel_colname;
379 tomas.vondra 955 : AttrNumber attnum;
956 :
379 tomas.vondra 957 GIC 277 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
958 277 : Assert(!isnull);
959 :
960 : /* If the column is not in the column list, skip it. */
379 tomas.vondra 961 CBC 277 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
379 tomas.vondra 962 ECB : {
379 tomas.vondra 963 CBC 22 : ExecClearTuple(slot);
379 tomas.vondra 964 GIC 22 : continue;
379 tomas.vondra 965 ECB : }
966 :
379 tomas.vondra 967 GIC 255 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2208 peter_e 968 255 : Assert(!isnull);
379 tomas.vondra 969 ECB :
379 tomas.vondra 970 GIC 255 : lrel->attnames[natt] = rel_colname;
379 tomas.vondra 971 CBC 255 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
2208 peter_e 972 GIC 255 : Assert(!isnull);
379 tomas.vondra 973 ECB :
379 tomas.vondra 974 GIC 255 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
2208 peter_e 975 CBC 94 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
976 :
977 : /* Should never happen. */
2208 peter_e 978 GIC 255 : if (++natt >= MaxTupleAttributeNumber)
2208 peter_e 979 UIC 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
980 : nspname, relname);
2208 peter_e 981 ECB :
2208 peter_e 982 CBC 255 : ExecClearTuple(slot);
983 : }
2208 peter_e 984 GIC 155 : ExecDropSingleTupleTableSlot(slot);
985 :
986 155 : lrel->natts = natt;
987 :
988 155 : walrcv_clear_result(res);
989 :
990 : /*
991 : * Get relation's row filter expressions. DISTINCT avoids the same
992 : * expression of a table in multiple publications from being included
993 : * multiple times in the final expression.
994 : *
411 akapila 995 ECB : * We need to copy the row even if it matches just one of the
996 : * publications, so we later combine all the quals with OR.
997 : *
998 : * For initial synchronization, row filtering can be ignored in following
999 : * cases:
1000 : *
1001 : * 1) one of the subscribed publications for the table hasn't specified
411 akapila 1002 EUB : * any row filter
1003 : *
1004 : * 2) one of the subscribed publications has puballtables set to true
1005 : *
1006 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1007 : * that includes this relation
411 akapila 1008 ECB : */
411 akapila 1009 CBC 155 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
411 akapila 1010 ECB : {
1011 : StringInfoData pub_names;
1012 :
1013 : /* Build the pubname list. */
411 akapila 1014 GIC 155 : initStringInfo(&pub_names);
1015 495 : foreach(lc, MySubscription->publications)
411 akapila 1016 ECB : {
411 akapila 1017 CBC 340 : char *pubname = strVal(lfirst(lc));
411 akapila 1018 ECB :
232 drowley 1019 GIC 340 : if (foreach_current_index(lc) > 0)
411 akapila 1020 185 : appendStringInfoString(&pub_names, ", ");
1021 :
1022 340 : appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
411 akapila 1023 ECB : }
1024 :
1025 : /* Check for row filters. */
411 akapila 1026 GIC 155 : resetStringInfo(&cmd);
411 akapila 1027 CBC 155 : appendStringInfo(&cmd,
1028 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
325 akapila 1029 ECB : " FROM pg_publication p,"
411 1030 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1031 : " WHERE gpt.relid = %u"
1032 : " AND p.pubname IN ( %s )",
1033 : lrel->remoteid,
1034 : pub_names.data);
1035 :
411 akapila 1036 CBC 155 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
411 akapila 1037 ECB :
411 akapila 1038 CBC 155 : if (res->status != WALRCV_OK_TUPLES)
411 akapila 1039 UIC 0 : ereport(ERROR,
411 akapila 1040 ECB : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1041 : nspname, relname, res->err)));
1042 :
1043 : /*
1044 : * Multiple row filter expressions for the same table will be combined
411 akapila 1045 EUB : * by COPY using OR. If any of the filter expressions for this table
1046 : * are null, it means the whole table will be copied. In this case it
1047 : * is not necessary to construct a unified row filter expression at
411 akapila 1048 ECB : * all.
1049 : */
411 akapila 1050 CBC 155 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
411 akapila 1051 GIC 169 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
411 akapila 1052 ECB : {
411 akapila 1053 GIC 159 : Datum rf = slot_getattr(slot, 1, &isnull);
411 akapila 1054 ECB :
411 akapila 1055 GIC 159 : if (!isnull)
1056 14 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1057 : else
1058 : {
1059 : /* Ignore filters and cleanup as necessary. */
1060 145 : if (*qual)
1061 : {
1062 3 : list_free_deep(*qual);
1063 3 : *qual = NIL;
1064 : }
1065 145 : break;
1066 : }
1067 :
1068 14 : ExecClearTuple(slot);
1069 : }
1070 155 : ExecDropSingleTupleTableSlot(slot);
1071 :
1072 155 : walrcv_clear_result(res);
1073 : }
1074 :
2208 peter_e 1075 CBC 155 : pfree(cmd.data);
2208 peter_e 1076 GIC 155 : }
1077 :
1078 : /*
1079 : * Copy existing data of a table from publisher.
2208 peter_e 1080 ECB : *
1081 : * Caller is responsible for locking the local relation.
1082 : */
1083 : static void
2208 peter_e 1084 GIC 155 : copy_table(Relation rel)
2208 peter_e 1085 ECB : {
1086 : LogicalRepRelMapEntry *relmapentry;
1087 : LogicalRepRelation lrel;
411 akapila 1088 CBC 155 : List *qual = NIL;
1089 : WalRcvExecResult *res;
1090 : StringInfoData cmd;
1091 : CopyFromState cstate;
2208 peter_e 1092 ECB : List *attnamelist;
2183 1093 : ParseState *pstate;
17 akapila 1094 GNC 155 : List *options = NIL;
1095 :
1096 : /* Get the publisher relation info. */
2208 peter_e 1097 GIC 155 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
411 akapila 1098 155 : RelationGetRelationName(rel), &lrel, &qual);
1099 :
1100 : /* Put the relation into relmap. */
2208 peter_e 1101 155 : logicalrep_relmap_update(&lrel);
1102 :
2208 peter_e 1103 ECB : /* Map the publisher relation to local one. */
2208 peter_e 1104 GIC 155 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
2208 peter_e 1105 CBC 155 : Assert(rel == relmapentry->localrel);
2208 peter_e 1106 EUB :
1107 : /* Start copy on the publisher. */
2208 peter_e 1108 GIC 155 : initStringInfo(&cmd);
1109 :
1110 : /* Regular table with no row filter */
411 akapila 1111 155 : if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1112 : {
379 tomas.vondra 1113 132 : appendStringInfo(&cmd, "COPY %s (",
1116 peter 1114 132 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1115 :
1116 : /*
332 tgl 1117 ECB : * XXX Do we need to list the columns in all cases? Maybe we're
1118 : * replicating all columns?
1119 : */
379 tomas.vondra 1120 CBC 352 : for (int i = 0; i < lrel.natts; i++)
1121 : {
1122 220 : if (i > 0)
1123 88 : appendStringInfoString(&cmd, ", ");
1124 :
379 tomas.vondra 1125 GIC 220 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1126 : }
379 tomas.vondra 1127 ECB :
215 drowley 1128 GNC 132 : appendStringInfoString(&cmd, ") TO STDOUT");
379 tomas.vondra 1129 ECB : }
1116 peter 1130 : else
1131 : {
1132 : /*
1133 : * For non-tables and tables with row filters, we need to do COPY
1134 : * (SELECT ...), but we can't just do SELECT * because we need to not
411 akapila 1135 : * copy generated columns. For tables with any row filters, build a
1136 : * SELECT query with OR'ed row filters for COPY.
1116 peter 1137 : */
906 drowley 1138 GIC 23 : appendStringInfoString(&cmd, "COPY (SELECT ");
1116 peter 1139 CBC 58 : for (int i = 0; i < lrel.natts; i++)
1140 : {
1116 peter 1141 GIC 35 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1116 peter 1142 CBC 35 : if (i < lrel.natts - 1)
1143 12 : appendStringInfoString(&cmd, ", ");
1144 : }
1145 :
411 akapila 1146 GIC 23 : appendStringInfoString(&cmd, " FROM ");
1147 :
1148 : /*
1149 : * For regular tables, make sure we don't copy data from a child that
1150 : * inherits the named table as those will be copied separately.
411 akapila 1151 ECB : */
411 akapila 1152 GIC 23 : if (lrel.relkind == RELKIND_RELATION)
1153 7 : appendStringInfoString(&cmd, "ONLY ");
1154 :
411 akapila 1155 CBC 23 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1156 : /* list of OR'ed filters */
411 akapila 1157 GIC 23 : if (qual != NIL)
1158 : {
1159 : ListCell *lc;
1160 10 : char *q = strVal(linitial(qual));
411 akapila 1161 ECB :
411 akapila 1162 GIC 10 : appendStringInfo(&cmd, " WHERE %s", q);
1163 11 : for_each_from(lc, qual, 1)
411 akapila 1164 ECB : {
411 akapila 1165 CBC 1 : q = strVal(lfirst(lc));
411 akapila 1166 GIC 1 : appendStringInfo(&cmd, " OR %s", q);
1167 : }
411 akapila 1168 CBC 10 : list_free_deep(qual);
1169 : }
1170 :
1171 23 : appendStringInfoString(&cmd, ") TO STDOUT");
1116 peter 1172 ECB : }
1173 :
1174 : /*
1175 : * Prior to v16, initial table synchronization will use text format even
1176 : * if the binary option is enabled for a subscription.
1177 : */
17 akapila 1178 GNC 155 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1179 155 : MySubscription->binary)
1180 : {
1181 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1182 5 : options = list_make1(makeDefElem("format",
1183 : (Node *) makeString("binary"), -1));
1184 : }
1185 :
697 alvherre 1186 GIC 155 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
2208 peter_e 1187 155 : pfree(cmd.data);
2208 peter_e 1188 CBC 155 : if (res->status != WALRCV_OK_COPY_OUT)
2208 peter_e 1189 UIC 0 : ereport(ERROR,
1190 : (errcode(ERRCODE_CONNECTION_FAILURE),
662 tgl 1191 ECB : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1192 : lrel.nspname, lrel.relname, res->err)));
2208 peter_e 1193 CBC 155 : walrcv_clear_result(res);
2208 peter_e 1194 ECB :
2208 peter_e 1195 GIC 155 : copybuf = makeStringInfo();
1196 :
2183 1197 155 : pstate = make_parsestate(NULL);
1193 tgl 1198 155 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1199 : NULL, false, false);
2183 peter_e 1200 ECB :
2208 peter_e 1201 GIC 155 : attnamelist = make_copy_attnamelist(relmapentry);
17 akapila 1202 GNC 155 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
2208 peter_e 1203 ECB :
1204 : /* Do the copy */
2208 peter_e 1205 CBC 154 : (void) CopyFrom(cstate);
1206 :
2208 peter_e 1207 GIC 148 : logicalrep_rel_close(relmapentry, NoLock);
2208 peter_e 1208 CBC 148 : }
1209 :
1210 : /*
1211 : * Determine the tablesync slot name.
1212 : *
1213 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1214 : * on slot name length. We append system_identifier to avoid slot_name
1215 : * collision with subscriptions in other clusters. With the current scheme
1216 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1217 : * length of slot_name will be 50.
786 akapila 1218 ECB : *
783 1219 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1220 : * the given size.
786 1221 : *
1222 : * Note: We don't use the subscription slot name as part of tablesync slot name
1223 : * because we are responsible for cleaning up these slots and it could become
1224 : * impossible to recalculate what name to cleanup if the subscription slot name
1225 : * had changed.
1226 : */
1227 : void
786 akapila 1228 GIC 305 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1229 : char *syncslotname, Size szslot)
1230 : {
783 1231 305 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
783 akapila 1232 ECB : relid, GetSystemIdentifier());
786 akapila 1233 CBC 305 : }
1234 :
1235 : /*
2208 peter_e 1236 ECB : * Start syncing the table in the sync worker.
1237 : *
1238 : * If nothing needs to be done to sync the table, we exit the worker without
906 alvherre 1239 : * any further action.
1240 : *
1241 : * The returned slot name is palloc'ed in current memory context.
1242 : */
1243 : char *
2208 peter_e 1244 GIC 156 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1245 : {
2153 bruce 1246 ECB : char *slotname;
1247 : char *err;
1248 : char relstate;
2180 fujii 1249 : XLogRecPtr relstate_lsn;
906 alvherre 1250 : Relation rel;
1251 : AclResult aclresult;
1252 : WalRcvExecResult *res;
1253 : char originname[NAMEDATALEN];
786 akapila 1254 : RepOriginId originid;
1255 : bool must_use_password;
2208 peter_e 1256 :
1257 : /* Check the state of the table synchronization. */
2208 peter_e 1258 GBC 156 : StartTransactionCommand();
2180 fujii 1259 GIC 156 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1260 156 : MyLogicalRepWorker->relid,
1261 : &relstate_lsn);
2180 fujii 1262 CBC 156 : CommitTransactionCommand();
1263 :
2208 peter_e 1264 156 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
2180 fujii 1265 GIC 156 : MyLogicalRepWorker->relstate = relstate;
2180 fujii 1266 CBC 156 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
2208 peter_e 1267 156 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1268 :
1269 : /*
906 alvherre 1270 ECB : * If synchronization is already done or no longer necessary, exit now
1271 : * that we've updated shared memory state.
1272 : */
906 alvherre 1273 GIC 156 : switch (relstate)
906 alvherre 1274 ECB : {
906 alvherre 1275 UIC 0 : case SUBREL_STATE_SYNCDONE:
906 alvherre 1276 ECB : case SUBREL_STATE_READY:
1277 : case SUBREL_STATE_UNKNOWN:
906 alvherre 1278 UIC 0 : finish_sync_worker(); /* doesn't return */
1279 : }
1280 :
1281 : /* Calculate the name of the tablesync slot. */
783 akapila 1282 GIC 156 : slotname = (char *) palloc(NAMEDATALEN);
1283 156 : ReplicationSlotNameForTablesync(MySubscription->oid,
1284 156 : MyLogicalRepWorker->relid,
1285 : slotname,
1286 : NAMEDATALEN);
1287 :
1288 : /* Is the use of a password mandatory? */
10 rhaas 1289 GNC 309 : must_use_password = MySubscription->passwordrequired &&
1290 153 : !superuser_arg(MySubscription->owner);
1291 :
1292 : /*
1293 : * Here we use the slot name instead of the subscription name as the
1294 : * application_name, so that it is different from the leader apply worker,
1295 : * so that synchronous replication can distinguish them.
1296 : */
697 alvherre 1297 GIC 156 : LogRepWorkerWalRcvConn =
10 rhaas 1298 GNC 156 : walrcv_connect(MySubscription->conninfo, true,
1299 : must_use_password,
1300 : slotname, &err);
697 alvherre 1301 GIC 156 : if (LogRepWorkerWalRcvConn == NULL)
2208 peter_e 1302 UIC 0 : ereport(ERROR,
662 tgl 1303 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
1304 : errmsg("could not connect to the publisher: %s", err)));
1305 :
906 alvherre 1306 CBC 156 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1307 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
786 akapila 1308 ECB : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1309 :
1310 : /* Assign the origin tracking record name. */
180 akapila 1311 GNC 156 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1312 156 : MyLogicalRepWorker->relid,
1313 : originname,
1314 : sizeof(originname));
1315 :
786 akapila 1316 GIC 156 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1317 : {
1318 : /*
786 akapila 1319 ECB : * We have previously errored out before finishing the copy so the
1320 : * replication slot might exist. We want to remove the slot if it
1321 : * already exists and proceed.
1322 : *
1323 : * XXX We could also instead try to drop the slot, last time we failed
1324 : * but for that, we might need to clean up the copy state as it might
1325 : * be in the middle of fetching the rows. Also, if there is a network
1326 : * breakdown then it wouldn't have succeeded so trying it next time
1327 : * seems like a better bet.
1328 : */
697 alvherre 1329 GIC 6 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1330 : }
786 akapila 1331 150 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1332 : {
786 akapila 1333 ECB : /*
1334 : * The COPY phase was previously done, but tablesync then crashed
1335 : * before it was able to finish normally.
1336 : */
786 akapila 1337 CBC 1 : StartTransactionCommand();
1338 :
786 akapila 1339 ECB : /*
1340 : * The origin tracking name must already exist. It was created first
1341 : * time this tablesync was launched.
1342 : */
786 akapila 1343 GIC 1 : originid = replorigin_by_name(originname, false);
90 akapila 1344 GNC 1 : replorigin_session_setup(originid, 0);
786 akapila 1345 GIC 1 : replorigin_session_origin = originid;
1346 1 : *origin_startpos = replorigin_session_get_progress(false);
1347 :
786 akapila 1348 CBC 1 : CommitTransactionCommand();
1349 :
786 akapila 1350 GBC 1 : goto copy_table_done;
1351 : }
1352 :
906 alvherre 1353 155 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
906 alvherre 1354 GIC 155 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1355 155 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1356 155 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
2208 peter_e 1357 ECB :
906 alvherre 1358 : /* Update the state and make it visible to others. */
906 alvherre 1359 CBC 155 : StartTransactionCommand();
906 alvherre 1360 GIC 155 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1361 155 : MyLogicalRepWorker->relid,
1362 155 : MyLogicalRepWorker->relstate,
1363 155 : MyLogicalRepWorker->relstate_lsn);
906 alvherre 1364 CBC 155 : CommitTransactionCommand();
368 andres 1365 155 : pgstat_report_stat(true);
1366 :
906 alvherre 1367 GIC 155 : StartTransactionCommand();
1368 :
1369 : /*
1370 : * Use a standard write lock here. It might be better to disallow access
1371 : * to the table while it's being synchronized. But we don't want to block
906 alvherre 1372 ECB : * the main apply process from working and it has to open the relation in
1373 : * RowExclusiveLock when remapping remote relation id to local one.
1374 : */
906 alvherre 1375 GIC 155 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
2208 peter_e 1376 ECB :
457 jdavis 1377 EUB : /*
1378 : * Check that our table sync worker has permission to insert into the
1379 : * target table.
1380 : */
457 jdavis 1381 CBC 155 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1382 : ACL_INSERT);
457 jdavis 1383 GIC 155 : if (aclresult != ACLCHECK_OK)
457 jdavis 1384 UIC 0 : aclcheck_error(aclresult,
1385 0 : get_relkind_objtype(rel->rd_rel->relkind),
457 jdavis 1386 LBC 0 : RelationGetRelationName(rel));
457 jdavis 1387 ECB :
1388 : /*
1389 : * COPY FROM does not honor RLS policies. That is not a problem for
1390 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
332 tgl 1391 : * who has it implicitly), but other roles should not be able to
1392 : * circumvent RLS. Disallow logical replication into RLS enabled
1393 : * relations for such roles.
1394 : */
457 jdavis 1395 GIC 155 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
457 jdavis 1396 UIC 0 : ereport(ERROR,
1397 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1398 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1399 : GetUserNameFromId(GetUserId(), true),
1400 : RelationGetRelationName(rel))));
1401 :
1402 : /*
1403 : * Start a transaction in the remote node in REPEATABLE READ mode. This
906 alvherre 1404 ECB : * ensures that both the replication slot we create (see below) and the
1405 : * COPY are consistent with each other.
1406 : */
697 alvherre 1407 GIC 155 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1408 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1409 : 0, NULL);
906 1410 155 : if (res->status != WALRCV_OK_COMMAND)
906 alvherre 1411 UIC 0 : ereport(ERROR,
662 tgl 1412 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
1413 : errmsg("table copy could not start transaction on publisher: %s",
1414 : res->err)));
906 alvherre 1415 GIC 155 : walrcv_clear_result(res);
1416 :
1417 : /*
786 akapila 1418 ECB : * Create a new permanent logical decoding slot. This slot will be used
906 alvherre 1419 : * for the catchup phase after COPY is done, so tell it to use the
1420 : * snapshot to make the final data consistent.
1421 : */
634 akapila 1422 GIC 155 : walrcv_create_slot(LogRepWorkerWalRcvConn,
634 akapila 1423 ECB : slotname, false /* permanent */ , false /* two_phase */ ,
1424 : CRS_USE_SNAPSHOT, origin_startpos);
2208 peter_e 1425 :
1426 : /*
1427 : * Setup replication origin tracking. The purpose of doing this before the
786 akapila 1428 : * copy is to avoid doing the copy again due to any error in setting up
1429 : * origin tracking.
1430 : */
786 akapila 1431 CBC 155 : originid = replorigin_by_name(originname, true);
786 akapila 1432 GIC 155 : if (!OidIsValid(originid))
1433 : {
786 akapila 1434 ECB : /*
1435 : * Origin tracking does not exist, so create it now.
1436 : *
1437 : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1438 : * logged for the purpose of recovery. Locks are to prevent the
1439 : * replication origin from vanishing while advancing.
1440 : */
786 akapila 1441 GIC 155 : originid = replorigin_create(originname);
786 akapila 1442 ECB :
786 akapila 1443 GIC 155 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1444 155 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1445 : true /* go backward */ , true /* WAL log */ );
1446 155 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1447 :
90 akapila 1448 GNC 155 : replorigin_session_setup(originid, 0);
786 akapila 1449 GIC 155 : replorigin_session_origin = originid;
786 akapila 1450 ECB : }
1451 : else
1452 : {
786 akapila 1453 UIC 0 : ereport(ERROR,
1454 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1455 : errmsg("replication origin \"%s\" already exists",
786 akapila 1456 ECB : originname)));
1457 : }
1458 :
367 tomas.vondra 1459 EUB : /* Now do the initial data copy */
367 tomas.vondra 1460 GBC 155 : PushActiveSnapshot(GetTransactionSnapshot());
1461 155 : copy_table(rel);
367 tomas.vondra 1462 GIC 148 : PopActiveSnapshot();
1463 :
697 alvherre 1464 148 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
906 1465 148 : if (res->status != WALRCV_OK_COMMAND)
906 alvherre 1466 UIC 0 : ereport(ERROR,
1467 : (errcode(ERRCODE_CONNECTION_FAILURE),
1468 : errmsg("table copy could not finish transaction on publisher: %s",
1469 : res->err)));
906 alvherre 1470 CBC 148 : walrcv_clear_result(res);
2208 peter_e 1471 EUB :
906 alvherre 1472 GIC 148 : table_close(rel, NoLock);
1473 :
1474 : /* Make the copy visible. */
1475 148 : CommandCounterIncrement();
1476 :
1477 : /*
1478 : * Update the persisted state to indicate the COPY phase is done; make it
1479 : * visible to others.
1480 : */
786 akapila 1481 148 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
786 akapila 1482 CBC 148 : MyLogicalRepWorker->relid,
1483 : SUBREL_STATE_FINISHEDCOPY,
786 akapila 1484 GIC 148 : MyLogicalRepWorker->relstate_lsn);
786 akapila 1485 ECB :
786 akapila 1486 GBC 148 : CommitTransactionCommand();
1487 :
786 akapila 1488 GIC 149 : copy_table_done:
1489 :
786 akapila 1490 CBC 149 : elog(DEBUG1,
1491 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1492 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1493 :
1494 : /*
1495 : * We are done with the initial data synchronization, update the state.
1496 : */
906 alvherre 1497 149 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
906 alvherre 1498 GIC 149 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1499 149 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1500 149 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1501 :
1502 : /*
1503 : * Finally, wait until the leader apply worker tells us to catch up and
1504 : * then return to let LogicalRepApplyLoop do it.
1505 : */
906 alvherre 1506 CBC 149 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
2208 peter_e 1507 149 : return slotname;
1508 : }
1509 :
1510 : /*
1511 : * Common code to fetch the up-to-date sync state info into the static lists.
1512 : *
1513 : * Returns true if subscription has 1 or more tables, else false.
1514 : *
1515 : * Note: If this function started the transaction (indicated by the parameter)
634 akapila 1516 ECB : * then it is the caller's responsibility to commit it.
1517 : */
1518 : static bool
634 akapila 1519 CBC 2534 : FetchTableStates(bool *started_tx)
1520 : {
634 akapila 1521 ECB : static bool has_subrels = false;
1522 :
634 akapila 1523 CBC 2534 : *started_tx = false;
634 akapila 1524 ECB :
634 akapila 1525 GIC 2534 : if (!table_states_valid)
1526 : {
1527 : MemoryContext oldctx;
634 akapila 1528 EUB : List *rstates;
1529 : ListCell *lc;
1530 : SubscriptionRelState *rstate;
1531 :
1532 : /* Clean the old lists. */
634 akapila 1533 GIC 664 : list_free_deep(table_states_not_ready);
1534 664 : table_states_not_ready = NIL;
634 akapila 1535 ECB :
634 akapila 1536 CBC 664 : if (!IsTransactionState())
634 akapila 1537 ECB : {
634 akapila 1538 GIC 651 : StartTransactionCommand();
634 akapila 1539 CBC 651 : *started_tx = true;
634 akapila 1540 ECB : }
634 akapila 1541 EUB :
1542 : /* Fetch all non-ready tables. */
256 michael 1543 GNC 664 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1544 :
634 akapila 1545 ECB : /* Allocate the tracking info in a permanent memory context. */
634 akapila 1546 GIC 664 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
634 akapila 1547 CBC 1855 : foreach(lc, rstates)
1548 : {
634 akapila 1549 GIC 1191 : rstate = palloc(sizeof(SubscriptionRelState));
634 akapila 1550 CBC 1191 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
634 akapila 1551 GIC 1191 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1552 : }
1553 664 : MemoryContextSwitchTo(oldctx);
1554 :
1555 : /*
634 akapila 1556 ECB : * Does the subscription have tables?
1557 : *
1558 : * If there were not-READY relations found then we know it does. But
1559 : * if table_state_not_ready was empty we still need to check again to
1560 : * see if there are 0 tables.
1561 : */
235 tgl 1562 GNC 831 : has_subrels = (table_states_not_ready != NIL) ||
634 akapila 1563 CBC 167 : HasSubscriptionRelations(MySubscription->oid);
1564 :
1565 664 : table_states_valid = true;
1566 : }
1567 :
634 akapila 1568 GIC 2534 : return has_subrels;
1569 : }
1570 :
1571 : /*
634 akapila 1572 ECB : * If the subscription has no tables then return false.
1573 : *
1574 : * Otherwise, are all tablesyncs READY?
1575 : *
1576 : * Note: This function is not suitable to be called from outside of apply or
1577 : * tablesync workers because MySubscription needs to be already initialized.
1578 : */
1579 : bool
634 akapila 1580 GIC 58 : AllTablesyncsReady(void)
634 akapila 1581 ECB : {
634 akapila 1582 CBC 58 : bool started_tx = false;
634 akapila 1583 GIC 58 : bool has_subrels = false;
1584 :
1585 : /* We need up-to-date sync state info for subscription tables here. */
1586 58 : has_subrels = FetchTableStates(&started_tx);
1587 :
1588 58 : if (started_tx)
1589 : {
1590 9 : CommitTransactionCommand();
368 andres 1591 9 : pgstat_report_stat(true);
1592 : }
1593 :
634 akapila 1594 ECB : /*
1595 : * Return false when there are no tables in subscription or not all tables
1596 : * are in ready state; true otherwise.
1597 : */
235 tgl 1598 GNC 58 : return has_subrels && (table_states_not_ready == NIL);
1599 : }
634 akapila 1600 ECB :
1601 : /*
1602 : * Update the two_phase state of the specified subscription in pg_subscription.
1603 : */
1604 : void
634 akapila 1605 GIC 4 : UpdateTwoPhaseState(Oid suboid, char new_state)
1606 : {
1607 : Relation rel;
634 akapila 1608 ECB : HeapTuple tup;
1609 : bool nulls[Natts_pg_subscription];
1610 : bool replaces[Natts_pg_subscription];
1611 : Datum values[Natts_pg_subscription];
1612 :
634 akapila 1613 CBC 4 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
634 akapila 1614 ECB : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1615 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1616 :
634 akapila 1617 GIC 4 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
634 akapila 1618 CBC 4 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
634 akapila 1619 GIC 4 : if (!HeapTupleIsValid(tup))
634 akapila 1620 UIC 0 : elog(ERROR,
634 akapila 1621 ECB : "cache lookup failed for subscription oid %u",
1622 : suboid);
1623 :
1624 : /* Form a new tuple. */
634 akapila 1625 CBC 4 : memset(values, 0, sizeof(values));
1626 4 : memset(nulls, false, sizeof(nulls));
634 akapila 1627 GIC 4 : memset(replaces, false, sizeof(replaces));
634 akapila 1628 ECB :
1629 : /* And update/set two_phase state */
634 akapila 1630 GIC 4 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1631 4 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1632 :
1633 4 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1634 : values, nulls, replaces);
1635 4 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1636 :
634 akapila 1637 CBC 4 : heap_freetuple(tup);
1638 4 : table_close(rel, RowExclusiveLock);
634 akapila 1639 GIC 4 : }
|