Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * tablesync.c
3 : : * PostgreSQL logical replication: initial table data synchronization
4 : : *
5 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/tablesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for initial table data synchronization for
12 : : * logical replication.
13 : : *
14 : : * The initial data synchronization is done separately for each table,
15 : : * in a separate apply worker that only fetches the initial snapshot data
16 : : * from the publisher and then synchronizes the position in the stream with
17 : : * the leader apply worker.
18 : : *
19 : : * There are several reasons for doing the synchronization this way:
20 : : * - It allows us to parallelize the initial data synchronization
21 : : * which lowers the time needed for it to happen.
22 : : * - The initial synchronization does not have to hold the xid and LSN
23 : : * for the time it takes to copy data of all tables, causing less
24 : : * bloat and lower disk consumption compared to doing the
25 : : * synchronization in a single process for the whole database.
26 : : * - It allows us to synchronize any tables added after the initial
27 : : * synchronization has finished.
28 : : *
29 : : * The stream position synchronization works in multiple steps:
30 : : * - Apply worker requests a tablesync worker to start, setting the new
31 : : * table state to INIT.
32 : : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : : * copying.
34 : : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : : * worker specific) state to indicate when the copy phase has completed, so
36 : : * if the worker crashes with this (non-memory) state then the copy will not
37 : : * be re-attempted.
38 : : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : : * until either the table state is set to SYNCDONE or the sync worker
42 : : * exits.
43 : : * - After the sync worker has seen the state change to CATCHUP, it will
44 : : * read the stream and apply changes (acting like an apply worker) until
45 : : * it catches up to the specified stream position. Then it sets the
46 : : * state to SYNCDONE. There might be zero changes applied between
47 : : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : : * apply worker.
49 : : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : : * the table until it reaches the SYNCDONE stream position, at which
51 : : * point it sets state to READY and stops tracking. Again, there might
52 : : * be zero changes in between.
53 : : *
54 : : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : : *
57 : : * The catalog pg_subscription_rel is used to keep information about
58 : : * subscribed tables and their state. The catalog holds all states
59 : : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : : *
61 : : * Example flows look like this:
62 : : * - Apply is in front:
63 : : * sync:8
64 : : * -> set in catalog FINISHEDCOPY
65 : : * -> set in memory SYNCWAIT
66 : : * apply:10
67 : : * -> set in memory CATCHUP
68 : : * -> enter wait-loop
69 : : * sync:10
70 : : * -> set in catalog SYNCDONE
71 : : * -> exit
72 : : * apply:10
73 : : * -> exit wait-loop
74 : : * -> continue rep
75 : : * apply:11
76 : : * -> set in catalog READY
77 : : *
78 : : * - Sync is in front:
79 : : * sync:10
80 : : * -> set in catalog FINISHEDCOPY
81 : : * -> set in memory SYNCWAIT
82 : : * apply:8
83 : : * -> set in memory CATCHUP
84 : : * -> continue per-table filtering
85 : : * sync:10
86 : : * -> set in catalog SYNCDONE
87 : : * -> exit
88 : : * apply:10
89 : : * -> set in catalog READY
90 : : * -> stop per-table filtering
91 : : * -> continue rep
92 : : *-------------------------------------------------------------------------
93 : : */
94 : :
95 : : #include "postgres.h"
96 : :
97 : : #include "access/table.h"
98 : : #include "access/xact.h"
99 : : #include "catalog/indexing.h"
100 : : #include "catalog/pg_subscription_rel.h"
101 : : #include "catalog/pg_type.h"
102 : : #include "commands/copy.h"
103 : : #include "miscadmin.h"
104 : : #include "nodes/makefuncs.h"
105 : : #include "parser/parse_relation.h"
106 : : #include "pgstat.h"
107 : : #include "replication/logicallauncher.h"
108 : : #include "replication/logicalrelation.h"
109 : : #include "replication/logicalworker.h"
110 : : #include "replication/origin.h"
111 : : #include "replication/slot.h"
112 : : #include "replication/walreceiver.h"
113 : : #include "replication/worker_internal.h"
114 : : #include "storage/ipc.h"
115 : : #include "storage/lmgr.h"
116 : : #include "utils/acl.h"
117 : : #include "utils/array.h"
118 : : #include "utils/builtins.h"
119 : : #include "utils/lsyscache.h"
120 : : #include "utils/memutils.h"
121 : : #include "utils/rls.h"
122 : : #include "utils/snapmgr.h"
123 : : #include "utils/syscache.h"
124 : : #include "utils/usercontext.h"
125 : :
126 : : static bool table_states_valid = false;
127 : : static List *table_states_not_ready = NIL;
128 : : static bool FetchTableStates(bool *started_tx);
129 : :
130 : : static StringInfo copybuf = NULL;
131 : :
132 : : /*
133 : : * Exit routine for synchronization worker.
134 : : */
135 : : static void
136 : : pg_attribute_noreturn()
2579 peter_e@gmx.net 137 :CBC 160 : finish_sync_worker(void)
138 : : {
139 : : /*
140 : : * Commit any outstanding transaction. This is the usual case, unless
141 : : * there was nothing to do for the table.
142 : : */
143 [ + - ]: 160 : if (IsTransactionState())
144 : : {
145 : 160 : CommitTransactionCommand();
739 andres@anarazel.de 146 : 160 : pgstat_report_stat(true);
147 : : }
148 : :
149 : : /* And flush all writes. */
2579 peter_e@gmx.net 150 : 160 : XLogFlush(GetXLogWriteRecPtr());
151 : :
2517 152 : 160 : StartTransactionCommand();
2579 153 [ + - ]: 160 : ereport(LOG,
154 : : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
155 : : MySubscription->name,
156 : : get_rel_name(MyLogicalRepWorker->relid))));
2517 157 : 160 : CommitTransactionCommand();
158 : :
159 : : /* Find the leader apply worker and signal it. */
2504 160 : 160 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
161 : :
162 : : /* Stop gracefully */
2579 163 : 160 : proc_exit(0);
164 : : }
165 : :
166 : : /*
167 : : * Wait until the relation sync state is set in the catalog to the expected
168 : : * one; return true when it happens.
169 : : *
170 : : * Returns false if the table sync worker or the table itself have
171 : : * disappeared, or the table state has been reset.
172 : : *
173 : : * Currently, this is used in the apply worker when transitioning from
174 : : * CATCHUP state to SYNCDONE.
175 : : */
176 : : static bool
2504 177 : 160 : wait_for_relation_state_change(Oid relid, char expected_state)
178 : : {
179 : : char state;
180 : :
181 : : for (;;)
2579 182 : 179 : {
183 : : LogicalRepWorker *worker;
184 : : XLogRecPtr statelsn;
185 : :
2508 186 [ - + ]: 339 : CHECK_FOR_INTERRUPTS();
187 : :
1277 alvherre@alvh.no-ip. 188 : 339 : InvalidateCatalogSnapshot();
2504 peter_e@gmx.net 189 : 339 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
190 : : relid, &statelsn);
191 : :
192 [ - + ]: 339 : if (state == SUBREL_STATE_UNKNOWN)
1277 alvherre@alvh.no-ip. 193 :UBC 0 : break;
194 : :
2504 peter_e@gmx.net 195 [ + + ]:CBC 339 : if (state == expected_state)
196 : 160 : return true;
197 : :
198 : : /* Check if the sync worker is still running and bail if not. */
2579 199 : 179 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1277 alvherre@alvh.no-ip. 200 : 179 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
201 : : false);
2504 peter_e@gmx.net 202 : 179 : LWLockRelease(LogicalRepWorkerLock);
2579 203 [ - + ]: 179 : if (!worker)
1277 alvherre@alvh.no-ip. 204 :UBC 0 : break;
205 : :
1969 tmunro@postgresql.or 206 :CBC 179 : (void) WaitLatch(MyLatch,
207 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
208 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
209 : :
2504 andres@anarazel.de 210 : 179 : ResetLatch(MyLatch);
211 : : }
212 : :
2504 peter_e@gmx.net 213 :UBC 0 : return false;
214 : : }
215 : :
216 : : /*
217 : : * Wait until the apply worker changes the state of our synchronization
218 : : * worker to the expected one.
219 : : *
220 : : * Used when transitioning from SYNCWAIT state to CATCHUP.
221 : : *
222 : : * Returns false if the apply worker has disappeared.
223 : : */
224 : : static bool
2504 peter_e@gmx.net 225 :CBC 160 : wait_for_worker_state_change(char expected_state)
226 : : {
227 : : int rc;
228 : :
229 : : for (;;)
230 : 160 : {
231 : : LogicalRepWorker *worker;
232 : :
233 [ - + ]: 320 : CHECK_FOR_INTERRUPTS();
234 : :
235 : : /*
236 : : * Done if already in correct state. (We assume this fetch is atomic
237 : : * enough to not give a misleading answer if we do it with no lock.)
238 : : */
2480 tgl@sss.pgh.pa.us 239 [ + + ]: 320 : if (MyLogicalRepWorker->relstate == expected_state)
240 : 160 : return true;
241 : :
242 : : /*
243 : : * Bail out if the apply worker has died, else signal it we're
244 : : * waiting.
245 : : */
2504 peter_e@gmx.net 246 : 160 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
247 : 160 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
248 : : InvalidOid, false);
2480 tgl@sss.pgh.pa.us 249 [ + - + - ]: 160 : if (worker && worker->proc)
250 : 160 : logicalrep_worker_wakeup_ptr(worker);
2504 peter_e@gmx.net 251 : 160 : LWLockRelease(LogicalRepWorkerLock);
252 [ - + ]: 160 : if (!worker)
2480 tgl@sss.pgh.pa.us 253 :UBC 0 : break;
254 : :
255 : : /*
256 : : * Wait. We expect to get a latch signal back from the apply worker,
257 : : * but use a timeout in case it dies without sending one.
258 : : */
2504 andres@anarazel.de 259 :CBC 160 : rc = WaitLatch(MyLatch,
260 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
261 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
262 : :
2480 tgl@sss.pgh.pa.us 263 [ + - ]: 160 : if (rc & WL_LATCH_SET)
264 : 160 : ResetLatch(MyLatch);
265 : : }
266 : :
2579 peter_e@gmx.net 267 :UBC 0 : return false;
268 : : }
269 : :
270 : : /*
271 : : * Callback from syscache invalidation.
272 : : */
273 : : void
2579 peter_e@gmx.net 274 :CBC 1548 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
275 : : {
276 : 1548 : table_states_valid = false;
277 : 1548 : }
278 : :
279 : : /*
280 : : * Handle table synchronization cooperation from the synchronization
281 : : * worker.
282 : : *
283 : : * If the sync worker is in CATCHUP state and reached (or passed) the
284 : : * predetermined synchronization point in the WAL stream, mark the table as
285 : : * SYNCDONE and finish.
286 : : */
287 : : static void
288 : 204 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
289 : : {
290 [ - + ]: 204 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
291 : :
292 [ + - ]: 204 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
293 [ + + ]: 204 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
294 : : {
295 : : TimeLineID tli;
1157 akapila@postgresql.o 296 : 160 : char syncslotname[NAMEDATALEN] = {0};
593 297 : 160 : char originname[NAMEDATALEN] = {0};
298 : :
2504 peter_e@gmx.net 299 : 160 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
2579 300 : 160 : MyLogicalRepWorker->relstate_lsn = current_lsn;
301 : :
302 : 160 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
303 : :
304 : : /*
305 : : * UpdateSubscriptionRelState must be called within a transaction.
306 : : */
1157 akapila@postgresql.o 307 [ + - ]: 160 : if (!IsTransactionState())
308 : 160 : StartTransactionCommand();
309 : :
2200 peter_e@gmx.net 310 : 160 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
311 : 160 : MyLogicalRepWorker->relid,
312 : 160 : MyLogicalRepWorker->relstate,
313 : 160 : MyLogicalRepWorker->relstate_lsn);
314 : :
315 : : /*
316 : : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
317 : : * the slot.
318 : : */
1068 alvherre@alvh.no-ip. 319 : 160 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
320 : :
321 : : /*
322 : : * Cleanup the tablesync slot.
323 : : *
324 : : * This has to be done after updating the state because otherwise if
325 : : * there is an error while doing the database operations we won't be
326 : : * able to rollback dropped slot.
327 : : */
1157 akapila@postgresql.o 328 : 160 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
329 : 160 : MyLogicalRepWorker->relid,
330 : : syncslotname,
331 : : sizeof(syncslotname));
332 : :
333 : : /*
334 : : * It is important to give an error if we are unable to drop the slot,
335 : : * otherwise, it won't be dropped till the corresponding subscription
336 : : * is dropped. So passing missing_ok = false.
337 : : */
1068 alvherre@alvh.no-ip. 338 : 160 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
339 : :
580 akapila@postgresql.o 340 : 160 : CommitTransactionCommand();
341 : 160 : pgstat_report_stat(false);
342 : :
343 : : /*
344 : : * Start a new transaction to clean up the tablesync origin tracking.
345 : : * This transaction will be ended within the finish_sync_worker().
346 : : * Now, even, if we fail to remove this here, the apply worker will
347 : : * ensure to clean it up afterward.
348 : : *
349 : : * We need to do this after the table state is set to SYNCDONE.
350 : : * Otherwise, if an error occurs while performing the database
351 : : * operation, the worker will be restarted and the in-memory state of
352 : : * replication progress (remote_lsn) won't be rolled-back which would
353 : : * have been cleared before restart. So, the restarted worker will use
354 : : * invalid replication progress state resulting in replay of
355 : : * transactions that have already been applied.
356 : : */
357 : 160 : StartTransactionCommand();
358 : :
551 359 : 160 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
360 : 160 : MyLogicalRepWorker->relid,
361 : : originname,
362 : : sizeof(originname));
363 : :
364 : : /*
365 : : * Resetting the origin session removes the ownership of the slot.
366 : : * This is needed to allow the origin to be dropped.
367 : : */
580 368 : 160 : replorigin_session_reset();
369 : 160 : replorigin_session_origin = InvalidRepOriginId;
370 : 160 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
371 : 160 : replorigin_session_origin_timestamp = 0;
372 : :
373 : : /*
374 : : * Drop the tablesync's origin tracking if exists.
375 : : *
376 : : * There is a chance that the user is concurrently performing refresh
377 : : * for the subscription where we remove the table state and its origin
378 : : * or the apply worker would have removed this origin. So passing
379 : : * missing_ok = true.
380 : : */
381 : 160 : replorigin_drop_by_name(originname, true, false);
382 : :
2579 peter_e@gmx.net 383 : 160 : finish_sync_worker();
384 : : }
385 : : else
386 : 44 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
387 : 44 : }
388 : :
389 : : /*
390 : : * Handle table synchronization cooperation from the apply worker.
391 : : *
392 : : * Walk over all subscription tables that are individually tracked by the
393 : : * apply process (currently, all that have state other than
394 : : * SUBREL_STATE_READY) and manage synchronization for them.
395 : : *
396 : : * If there are tables that need synchronizing and are not being synchronized
397 : : * yet, start sync workers for them (if there are free slots for sync
398 : : * workers). To prevent starting the sync worker for the same relation at a
399 : : * high frequency after a failure, we store its last start time with each sync
400 : : * state info. We start the sync worker for the same relation after waiting
401 : : * at least wal_retrieve_retry_interval.
402 : : *
403 : : * For tables that are being synchronized already, check if sync workers
404 : : * either need action from the apply worker or have finished. This is the
405 : : * SYNCWAIT to CATCHUP transition.
406 : : *
407 : : * If the synchronization position is reached (SYNCDONE), then the table can
408 : : * be marked as READY and is no longer tracked.
409 : : */
410 : : static void
411 : 4388 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
412 : : {
413 : : struct tablesync_start_time_mapping
414 : : {
415 : : Oid relid;
416 : : TimestampTz last_start_time;
417 : : };
418 : : static HTAB *last_start_times = NULL;
419 : : ListCell *lc;
2533 420 : 4388 : bool started_tx = false;
464 tgl@sss.pgh.pa.us 421 : 4388 : bool should_exit = false;
422 : :
2579 peter_e@gmx.net 423 [ - + ]: 4388 : Assert(!IsTransactionState());
424 : :
425 : : /* We need up-to-date sync state info for subscription tables here. */
1005 akapila@postgresql.o 426 : 4388 : FetchTableStates(&started_tx);
427 : :
428 : : /*
429 : : * Prepare a hash table for tracking last start times of workers, to avoid
430 : : * immediate restarts. We don't need it if there are no tables that need
431 : : * syncing.
432 : : */
606 tgl@sss.pgh.pa.us 433 [ + + + + ]: 4388 : if (table_states_not_ready != NIL && !last_start_times)
2544 peter_e@gmx.net 434 : 101 : {
435 : : HASHCTL ctl;
436 : :
437 : 101 : ctl.keysize = sizeof(Oid);
438 : 101 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
439 : 101 : last_start_times = hash_create("Logical replication table sync worker start times",
440 : : 256, &ctl, HASH_ELEM | HASH_BLOBS);
441 : : }
442 : :
443 : : /*
444 : : * Clean up the hash table when we're done with all tables (just to
445 : : * release the bit of memory).
446 : : */
606 tgl@sss.pgh.pa.us 447 [ + + + + ]: 4287 : else if (table_states_not_ready == NIL && last_start_times)
448 : : {
2544 peter_e@gmx.net 449 : 75 : hash_destroy(last_start_times);
450 : 75 : last_start_times = NULL;
451 : : }
452 : :
453 : : /*
454 : : * Process all tables that are being synchronized.
455 : : */
1005 akapila@postgresql.o 456 [ + + + + : 5780 : foreach(lc, table_states_not_ready)
+ + ]
457 : : {
2524 bruce@momjian.us 458 : 1392 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
459 : :
2579 peter_e@gmx.net 460 [ + + ]: 1392 : if (rstate->state == SUBREL_STATE_SYNCDONE)
461 : : {
462 : : /*
463 : : * Apply has caught up to the position where the table sync has
464 : : * finished. Mark the table as ready so that the apply will just
465 : : * continue to replicate it normally.
466 : : */
467 [ + - ]: 158 : if (current_lsn >= rstate->lsn)
468 : : {
469 : : char originname[NAMEDATALEN];
470 : :
471 : 158 : rstate->state = SUBREL_STATE_READY;
472 : 158 : rstate->lsn = current_lsn;
2533 473 [ + + ]: 158 : if (!started_tx)
474 : : {
475 : 7 : StartTransactionCommand();
476 : 7 : started_tx = true;
477 : : }
478 : :
479 : : /*
480 : : * Remove the tablesync origin tracking if exists.
481 : : *
482 : : * There is a chance that the user is concurrently performing
483 : : * refresh for the subscription where we remove the table
484 : : * state and its origin or the tablesync worker would have
485 : : * already removed this origin. We can't rely on tablesync
486 : : * worker to remove the origin tracking as if there is any
487 : : * error while dropping we won't restart it to drop the
488 : : * origin. So passing missing_ok = true.
489 : : */
551 akapila@postgresql.o 490 : 158 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
491 : : rstate->relid,
492 : : originname,
493 : : sizeof(originname));
580 494 : 158 : replorigin_drop_by_name(originname, true, false);
495 : :
496 : : /*
497 : : * Update the state to READY only after the origin cleanup.
498 : : */
2200 peter_e@gmx.net 499 : 158 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
500 : 158 : rstate->relid, rstate->state,
501 : : rstate->lsn);
502 : : }
503 : : }
504 : : else
505 : : {
506 : : LogicalRepWorker *syncworker;
507 : :
508 : : /*
509 : : * Look for a sync worker for this relation.
510 : : */
2579 511 : 1234 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
512 : :
513 : 1234 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
514 : : rstate->relid, false);
515 : :
516 [ + + ]: 1234 : if (syncworker)
517 : : {
518 : : /* Found one, update our copy of its state */
519 [ - + ]: 549 : SpinLockAcquire(&syncworker->relmutex);
520 : 549 : rstate->state = syncworker->relstate;
521 : 549 : rstate->lsn = syncworker->relstate_lsn;
2480 tgl@sss.pgh.pa.us 522 [ + + ]: 549 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
523 : : {
524 : : /*
525 : : * Sync worker is waiting for apply. Tell sync worker it
526 : : * can catchup now.
527 : : */
528 : 160 : syncworker->relstate = SUBREL_STATE_CATCHUP;
529 : 160 : syncworker->relstate_lsn =
530 : 160 : Max(syncworker->relstate_lsn, current_lsn);
531 : : }
2579 peter_e@gmx.net 532 : 549 : SpinLockRelease(&syncworker->relmutex);
533 : :
534 : : /* If we told worker to catch up, wait for it. */
2480 tgl@sss.pgh.pa.us 535 [ + + ]: 549 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
536 : : {
537 : : /* Signal the sync worker, as it may be waiting for us. */
538 [ + - ]: 160 : if (syncworker->proc)
539 : 160 : logicalrep_worker_wakeup_ptr(syncworker);
540 : :
541 : : /* Now safe to release the LWLock */
542 : 160 : LWLockRelease(LogicalRepWorkerLock);
543 : :
125 akapila@postgresql.o 544 [ + - ]: 160 : if (started_tx)
545 : : {
546 : : /*
547 : : * We must commit the existing transaction to release
548 : : * the existing locks before entering a busy loop.
549 : : * This is required to avoid any undetected deadlocks
550 : : * due to any existing lock as deadlock detector won't
551 : : * be able to detect the waits on the latch.
552 : : */
553 : 160 : CommitTransactionCommand();
554 : 160 : pgstat_report_stat(false);
555 : : }
556 : :
557 : : /*
558 : : * Enter busy loop and wait for synchronization worker to
559 : : * reach expected state (or die trying).
560 : : */
561 : 160 : StartTransactionCommand();
562 : 160 : started_tx = true;
563 : :
2480 tgl@sss.pgh.pa.us 564 : 160 : wait_for_relation_state_change(rstate->relid,
565 : : SUBREL_STATE_SYNCDONE);
566 : : }
567 : : else
568 : 389 : LWLockRelease(LogicalRepWorkerLock);
569 : : }
570 : : else
571 : : {
572 : : /*
573 : : * If there is no sync worker for this table yet, count
574 : : * running sync workers for this subscription, while we have
575 : : * the lock.
576 : : */
577 : : int nsyncworkers =
331 578 : 685 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
579 : :
580 : : /* Now safe to release the LWLock */
2480 581 : 685 : LWLockRelease(LogicalRepWorkerLock);
582 : :
583 : : /*
584 : : * If there are free sync worker slot(s), start a new sync
585 : : * worker for the table.
586 : : */
587 [ + + ]: 685 : if (nsyncworkers < max_sync_workers_per_subscription)
588 : : {
589 : 200 : TimestampTz now = GetCurrentTimestamp();
590 : : struct tablesync_start_time_mapping *hentry;
591 : : bool found;
592 : :
593 : 200 : hentry = hash_search(last_start_times, &rstate->relid,
594 : : HASH_ENTER, &found);
595 : :
596 [ + + + + ]: 236 : if (!found ||
597 : 36 : TimestampDifferenceExceeds(hentry->last_start_time, now,
598 : : wal_retrieve_retry_interval))
599 : : {
244 akapila@postgresql.o 600 :GNC 172 : logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
601 : 172 : MyLogicalRepWorker->dbid,
2480 tgl@sss.pgh.pa.us 602 :CBC 172 : MySubscription->oid,
603 : 172 : MySubscription->name,
604 : 172 : MyLogicalRepWorker->userid,
605 : : rstate->relid,
606 : : DSM_HANDLE_INVALID);
607 : 172 : hentry->last_start_time = now;
608 : : }
609 : : }
610 : : }
611 : : }
612 : : }
613 : :
2533 peter_e@gmx.net 614 [ + + ]: 4388 : if (started_tx)
615 : : {
616 : : /*
617 : : * Even when the two_phase mode is requested by the user, it remains
618 : : * as 'pending' until all tablesyncs have reached READY state.
619 : : *
620 : : * When this happens, we restart the apply worker and (if the
621 : : * conditions are still ok) then the two_phase tri-state will become
622 : : * 'enabled' at that time.
623 : : *
624 : : * Note: If the subscription has no tables then leave the state as
625 : : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
626 : : * work.
627 : : */
464 tgl@sss.pgh.pa.us 628 [ + + ]: 714 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
629 : : {
630 : 24 : CommandCounterIncrement(); /* make updates visible */
631 [ + + ]: 24 : if (AllTablesyncsReady())
632 : : {
633 [ + - ]: 6 : ereport(LOG,
634 : : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
635 : : MySubscription->name)));
636 : 6 : should_exit = true;
637 : : }
638 : : }
639 : :
2533 peter_e@gmx.net 640 : 714 : CommitTransactionCommand();
739 andres@anarazel.de 641 : 714 : pgstat_report_stat(true);
642 : : }
643 : :
464 tgl@sss.pgh.pa.us 644 [ + + ]: 4388 : if (should_exit)
645 : : {
646 : : /*
647 : : * Reset the last-start time for this worker so that the launcher will
648 : : * restart it without waiting for wal_retrieve_retry_interval.
649 : : */
448 650 : 6 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
651 : :
464 652 : 6 : proc_exit(0);
653 : : }
2579 peter_e@gmx.net 654 : 4382 : }
655 : :
656 : : /*
657 : : * Process possible state change(s) of tables that are being synchronized.
658 : : */
659 : : void
660 : 4618 : process_syncing_tables(XLogRecPtr current_lsn)
661 : : {
236 akapila@postgresql.o 662 [ + + + - :GNC 4618 : switch (MyLogicalRepWorker->type)
- ]
663 : : {
664 : 26 : case WORKERTYPE_PARALLEL_APPLY:
665 : :
666 : : /*
667 : : * Skip for parallel apply workers because they only operate on
668 : : * tables that are in a READY state. See pa_can_start() and
669 : : * should_apply_changes_for_rel().
670 : : */
671 : 26 : break;
672 : :
673 : 204 : case WORKERTYPE_TABLESYNC:
674 : 204 : process_syncing_tables_for_sync(current_lsn);
675 : 44 : break;
676 : :
677 : 4388 : case WORKERTYPE_APPLY:
678 : 4388 : process_syncing_tables_for_apply(current_lsn);
679 : 4382 : break;
680 : :
236 akapila@postgresql.o 681 :UNC 0 : case WORKERTYPE_UNKNOWN:
682 : : /* Should never happen. */
683 [ # # ]: 0 : elog(ERROR, "Unknown worker type");
684 : : }
2579 peter_e@gmx.net 685 :GIC 4452 : }
686 : :
687 : : /*
688 : : * Create list of columns for COPY based on logical relation mapping.
689 : : */
690 : : static List *
2579 peter_e@gmx.net 691 :CBC 168 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
692 : : {
693 : 168 : List *attnamelist = NIL;
694 : : int i;
695 : :
2523 696 [ + + ]: 437 : for (i = 0; i < rel->remoterel.natts; i++)
697 : : {
2579 698 : 269 : attnamelist = lappend(attnamelist,
2523 699 : 269 : makeString(rel->remoterel.attnames[i]));
700 : : }
701 : :
702 : :
2579 703 : 168 : return attnamelist;
704 : : }
705 : :
706 : : /*
707 : : * Data source callback for the COPY FROM, which reads from the remote
708 : : * connection and passes the data back to our local COPY.
709 : : */
710 : : static int
711 : 13938 : copy_read_data(void *outbuf, int minread, int maxread)
712 : : {
2524 bruce@momjian.us 713 : 13938 : int bytesread = 0;
714 : : int avail;
715 : :
716 : : /* If there are some leftover data from previous read, use it. */
2579 peter_e@gmx.net 717 : 13938 : avail = copybuf->len - copybuf->cursor;
718 [ - + ]: 13938 : if (avail)
719 : : {
2579 peter_e@gmx.net 720 [ # # ]:UBC 0 : if (avail > maxread)
721 : 0 : avail = maxread;
722 : 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
723 : 0 : copybuf->cursor += avail;
724 : 0 : maxread -= avail;
725 : 0 : bytesread += avail;
726 : : }
727 : :
2508 peter_e@gmx.net 728 [ + - + - ]:CBC 13938 : while (maxread > 0 && bytesread < minread)
729 : : {
2579 730 : 13938 : pgsocket fd = PGINVALID_SOCKET;
731 : : int len;
732 : 13938 : char *buf = NULL;
733 : :
734 : : for (;;)
735 : : {
736 : : /* Try read the data. */
1068 alvherre@alvh.no-ip. 737 :UBC 0 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
738 : :
2579 peter_e@gmx.net 739 [ - + ]:CBC 13938 : CHECK_FOR_INTERRUPTS();
740 : :
741 [ - + ]: 13938 : if (len == 0)
2579 peter_e@gmx.net 742 :UBC 0 : break;
2579 peter_e@gmx.net 743 [ + + ]:CBC 13938 : else if (len < 0)
744 : 13938 : return bytesread;
745 : : else
746 : : {
747 : : /* Process the data */
748 : 13772 : copybuf->data = buf;
749 : 13772 : copybuf->len = len;
750 : 13772 : copybuf->cursor = 0;
751 : :
752 : 13772 : avail = copybuf->len - copybuf->cursor;
753 [ - + ]: 13772 : if (avail > maxread)
2579 peter_e@gmx.net 754 :UBC 0 : avail = maxread;
2579 peter_e@gmx.net 755 :CBC 13772 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
756 : 13772 : outbuf = (void *) ((char *) outbuf + avail);
757 : 13772 : copybuf->cursor += avail;
758 : 13772 : maxread -= avail;
759 : 13772 : bytesread += avail;
760 : : }
761 : :
762 [ + - + - ]: 13772 : if (maxread <= 0 || bytesread >= minread)
763 : 13772 : return bytesread;
764 : : }
765 : :
766 : : /*
767 : : * Wait for more data or latch.
768 : : */
1969 tmunro@postgresql.or 769 :UBC 0 : (void) WaitLatchOrSocket(MyLatch,
770 : : WL_SOCKET_READABLE | WL_LATCH_SET |
771 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
772 : : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
773 : :
2504 andres@anarazel.de 774 : 0 : ResetLatch(MyLatch);
775 : : }
776 : :
2579 peter_e@gmx.net 777 : 0 : return bytesread;
778 : : }
779 : :
780 : :
781 : : /*
782 : : * Get information about remote relation in similar fashion the RELATION
783 : : * message provides during replication. This function also returns the relation
784 : : * qualifications to be used in the COPY command.
785 : : */
786 : : static void
2579 peter_e@gmx.net 787 :CBC 169 : fetch_remote_table_info(char *nspname, char *relname,
788 : : LogicalRepRelation *lrel, List **qual)
789 : : {
790 : : WalRcvExecResult *res;
791 : : StringInfoData cmd;
792 : : TupleTableSlot *slot;
1487 peter@eisentraut.org 793 : 169 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
750 tomas.vondra@postgre 794 : 169 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
782 akapila@postgresql.o 795 : 169 : Oid qualRow[] = {TEXTOID};
796 : : bool isnull;
797 : : int natt;
798 : : ListCell *lc;
750 tomas.vondra@postgre 799 : 169 : Bitmapset *included_cols = NULL;
800 : :
2579 peter_e@gmx.net 801 : 169 : lrel->nspname = nspname;
802 : 169 : lrel->relname = relname;
803 : :
804 : : /* First fetch Oid and replica identity. */
805 : 169 : initStringInfo(&cmd);
1487 peter@eisentraut.org 806 : 169 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
807 : : " FROM pg_catalog.pg_class c"
808 : : " INNER JOIN pg_catalog.pg_namespace n"
809 : : " ON (c.relnamespace = n.oid)"
810 : : " WHERE n.nspname = %s"
811 : : " AND c.relname = %s",
812 : : quote_literal_cstr(nspname),
813 : : quote_literal_cstr(relname));
1068 alvherre@alvh.no-ip. 814 : 169 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
815 : : lengthof(tableRow), tableRow);
816 : :
2579 peter_e@gmx.net 817 [ - + ]: 168 : if (res->status != WALRCV_OK_TUPLES)
2579 peter_e@gmx.net 818 [ # # ]:UBC 0 : ereport(ERROR,
819 : : (errcode(ERRCODE_CONNECTION_FAILURE),
820 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
821 : : nspname, relname, res->err)));
822 : :
1977 andres@anarazel.de 823 :CBC 168 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2579 peter_e@gmx.net 824 [ - + ]: 168 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2579 peter_e@gmx.net 825 [ # # ]:UBC 0 : ereport(ERROR,
826 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
827 : : errmsg("table \"%s.%s\" not found on publisher",
828 : : nspname, relname)));
829 : :
2579 peter_e@gmx.net 830 :CBC 168 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
831 [ - + ]: 168 : Assert(!isnull);
832 : 168 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
833 [ - + ]: 168 : Assert(!isnull);
1487 peter@eisentraut.org 834 : 168 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
835 [ - + ]: 168 : Assert(!isnull);
836 : :
2579 peter_e@gmx.net 837 : 168 : ExecDropSingleTupleTableSlot(slot);
838 : 168 : walrcv_clear_result(res);
839 : :
840 : :
841 : : /*
842 : : * Get column lists for each relation.
843 : : *
844 : : * We need to do this before fetching info about column names and types,
845 : : * so that we can skip columns that should not be replicated.
846 : : */
750 tomas.vondra@postgre 847 [ + - ]: 168 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
848 : : {
849 : : WalRcvExecResult *pubres;
850 : : TupleTableSlot *tslot;
682 akapila@postgresql.o 851 : 168 : Oid attrsRow[] = {INT2VECTOROID};
852 : : StringInfoData pub_names;
853 : :
750 tomas.vondra@postgre 854 : 168 : initStringInfo(&pub_names);
855 [ + - + + : 522 : foreach(lc, MySubscription->publications)
+ + ]
856 : : {
603 drowley@postgresql.o 857 [ + + ]: 354 : if (foreach_current_index(lc) > 0)
586 858 : 186 : appendStringInfoString(&pub_names, ", ");
750 tomas.vondra@postgre 859 : 354 : appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
860 : : }
861 : :
862 : : /*
863 : : * Fetch info about column lists for the relation (from all the
864 : : * publications).
865 : : */
866 : 168 : resetStringInfo(&cmd);
867 : 168 : appendStringInfo(&cmd,
868 : : "SELECT DISTINCT"
869 : : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
870 : : " THEN NULL ELSE gpt.attrs END)"
871 : : " FROM pg_publication p,"
872 : : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
873 : : " pg_class c"
874 : : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
875 : : " AND p.pubname IN ( %s )",
876 : : lrel->remoteid,
877 : : pub_names.data);
878 : :
879 : 168 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
880 : : lengthof(attrsRow), attrsRow);
881 : :
882 [ - + ]: 168 : if (pubres->status != WALRCV_OK_TUPLES)
750 tomas.vondra@postgre 883 [ # # ]:UBC 0 : ereport(ERROR,
884 : : (errcode(ERRCODE_CONNECTION_FAILURE),
885 : : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
886 : : nspname, relname, pubres->err)));
887 : :
888 : : /*
889 : : * We don't support the case where the column list is different for
890 : : * the same table when combining publications. See comments atop
891 : : * fetch_table_list. So there should be only one row returned.
892 : : * Although we already checked this when creating the subscription, we
893 : : * still need to check here in case the column list was changed after
894 : : * creating the subscription and before the sync worker is started.
895 : : */
682 akapila@postgresql.o 896 [ - + ]:CBC 168 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
682 akapila@postgresql.o 897 [ # # ]:UBC 0 : ereport(ERROR,
898 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
899 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
900 : : nspname, relname));
901 : :
902 : : /*
903 : : * Get the column list and build a single bitmap with the attnums.
904 : : *
905 : : * If we find a NULL value, it means all the columns should be
906 : : * replicated.
907 : : */
603 drowley@postgresql.o 908 :CBC 168 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
909 [ + - ]: 168 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
910 : : {
911 : 168 : Datum cfval = slot_getattr(tslot, 1, &isnull);
912 : :
682 akapila@postgresql.o 913 [ + + ]: 168 : if (!isnull)
914 : : {
915 : : ArrayType *arr;
916 : : int nelems;
917 : : int16 *elems;
918 : :
919 : 19 : arr = DatumGetArrayTypeP(cfval);
920 : 19 : nelems = ARR_DIMS(arr)[0];
921 [ - + ]: 19 : elems = (int16 *) ARR_DATA_PTR(arr);
922 : :
923 [ + + ]: 53 : for (natt = 0; natt < nelems; natt++)
924 : 34 : included_cols = bms_add_member(included_cols, elems[natt]);
925 : : }
926 : :
603 drowley@postgresql.o 927 : 168 : ExecClearTuple(tslot);
928 : : }
929 : 168 : ExecDropSingleTupleTableSlot(tslot);
930 : :
750 tomas.vondra@postgre 931 : 168 : walrcv_clear_result(pubres);
932 : :
933 : 168 : pfree(pub_names.data);
934 : : }
935 : :
936 : : /*
937 : : * Now fetch column names and types.
938 : : */
2579 peter_e@gmx.net 939 : 168 : resetStringInfo(&cmd);
940 [ + - ]: 168 : appendStringInfo(&cmd,
941 : : "SELECT a.attnum,"
942 : : " a.attname,"
943 : : " a.atttypid,"
944 : : " a.attnum = ANY(i.indkey)"
945 : : " FROM pg_catalog.pg_attribute a"
946 : : " LEFT JOIN pg_catalog.pg_index i"
947 : : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
948 : : " WHERE a.attnum > 0::pg_catalog.int2"
949 : : " AND NOT a.attisdropped %s"
950 : : " AND a.attrelid = %u"
951 : : " ORDER BY a.attnum",
952 : : lrel->remoteid,
1068 alvherre@alvh.no-ip. 953 : 168 : (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
954 : : "AND a.attgenerated = ''" : ""),
955 : : lrel->remoteid);
956 : 168 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
957 : : lengthof(attrRow), attrRow);
958 : :
2579 peter_e@gmx.net 959 [ - + ]: 168 : if (res->status != WALRCV_OK_TUPLES)
2579 peter_e@gmx.net 960 [ # # ]:UBC 0 : ereport(ERROR,
961 : : (errcode(ERRCODE_CONNECTION_FAILURE),
962 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
963 : : nspname, relname, res->err)));
964 : :
965 : : /* We don't know the number of rows coming, so allocate enough space. */
2579 peter_e@gmx.net 966 :CBC 168 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
967 : 168 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
968 : 168 : lrel->attkeys = NULL;
969 : :
970 : : /*
971 : : * Store the columns as a list of names. Ignore those that are not
972 : : * present in the column list, if there is one.
973 : : */
974 : 168 : natt = 0;
1977 andres@anarazel.de 975 : 168 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2579 peter_e@gmx.net 976 [ + + ]: 459 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
977 : : {
978 : : char *rel_colname;
979 : : AttrNumber attnum;
980 : :
750 tomas.vondra@postgre 981 : 291 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
982 [ - + ]: 291 : Assert(!isnull);
983 : :
984 : : /* If the column is not in the column list, skip it. */
985 [ + + + + ]: 291 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
986 : : {
987 : 22 : ExecClearTuple(slot);
988 : 22 : continue;
989 : : }
990 : :
991 : 269 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2579 peter_e@gmx.net 992 [ - + ]: 269 : Assert(!isnull);
993 : :
750 tomas.vondra@postgre 994 : 269 : lrel->attnames[natt] = rel_colname;
995 : 269 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
2579 peter_e@gmx.net 996 [ - + ]: 269 : Assert(!isnull);
997 : :
750 tomas.vondra@postgre 998 [ + + ]: 269 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
2579 peter_e@gmx.net 999 : 97 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1000 : :
1001 : : /* Should never happen. */
1002 [ - + ]: 269 : if (++natt >= MaxTupleAttributeNumber)
2579 peter_e@gmx.net 1003 [ # # ]:UBC 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1004 : : nspname, relname);
1005 : :
2579 peter_e@gmx.net 1006 :CBC 269 : ExecClearTuple(slot);
1007 : : }
1008 : 168 : ExecDropSingleTupleTableSlot(slot);
1009 : :
1010 : 168 : lrel->natts = natt;
1011 : :
1012 : 168 : walrcv_clear_result(res);
1013 : :
1014 : : /*
1015 : : * Get relation's row filter expressions. DISTINCT avoids the same
1016 : : * expression of a table in multiple publications from being included
1017 : : * multiple times in the final expression.
1018 : : *
1019 : : * We need to copy the row even if it matches just one of the
1020 : : * publications, so we later combine all the quals with OR.
1021 : : *
1022 : : * For initial synchronization, row filtering can be ignored in following
1023 : : * cases:
1024 : : *
1025 : : * 1) one of the subscribed publications for the table hasn't specified
1026 : : * any row filter
1027 : : *
1028 : : * 2) one of the subscribed publications has puballtables set to true
1029 : : *
1030 : : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1031 : : * that includes this relation
1032 : : */
782 akapila@postgresql.o 1033 [ + - ]: 168 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
1034 : : {
1035 : : StringInfoData pub_names;
1036 : :
1037 : : /* Build the pubname list. */
1038 : 168 : initStringInfo(&pub_names);
101 nathan@postgresql.or 1039 [ + - + + :GNC 690 : foreach_node(String, pubstr, MySubscription->publications)
+ + ]
1040 : : {
1041 : 354 : char *pubname = strVal(pubstr);
1042 : :
1043 [ + + ]: 354 : if (foreach_current_index(pubstr) > 0)
782 akapila@postgresql.o 1044 :CBC 186 : appendStringInfoString(&pub_names, ", ");
1045 : :
1046 : 354 : appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
1047 : : }
1048 : :
1049 : : /* Check for row filters. */
1050 : 168 : resetStringInfo(&cmd);
1051 : 168 : appendStringInfo(&cmd,
1052 : : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1053 : : " FROM pg_publication p,"
1054 : : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1055 : : " WHERE gpt.relid = %u"
1056 : : " AND p.pubname IN ( %s )",
1057 : : lrel->remoteid,
1058 : : pub_names.data);
1059 : :
1060 : 168 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1061 : :
1062 [ - + ]: 168 : if (res->status != WALRCV_OK_TUPLES)
782 akapila@postgresql.o 1063 [ # # ]:UBC 0 : ereport(ERROR,
1064 : : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1065 : : nspname, relname, res->err)));
1066 : :
1067 : : /*
1068 : : * Multiple row filter expressions for the same table will be combined
1069 : : * by COPY using OR. If any of the filter expressions for this table
1070 : : * are null, it means the whole table will be copied. In this case it
1071 : : * is not necessary to construct a unified row filter expression at
1072 : : * all.
1073 : : */
782 akapila@postgresql.o 1074 :CBC 168 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1075 [ + + ]: 182 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1076 : : {
1077 : 172 : Datum rf = slot_getattr(slot, 1, &isnull);
1078 : :
1079 [ + + ]: 172 : if (!isnull)
1080 : 14 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1081 : : else
1082 : : {
1083 : : /* Ignore filters and cleanup as necessary. */
1084 [ + + ]: 158 : if (*qual)
1085 : : {
1086 : 3 : list_free_deep(*qual);
1087 : 3 : *qual = NIL;
1088 : : }
1089 : 158 : break;
1090 : : }
1091 : :
1092 : 14 : ExecClearTuple(slot);
1093 : : }
1094 : 168 : ExecDropSingleTupleTableSlot(slot);
1095 : :
1096 : 168 : walrcv_clear_result(res);
1097 : : }
1098 : :
2579 peter_e@gmx.net 1099 : 168 : pfree(cmd.data);
1100 : 168 : }
1101 : :
1102 : : /*
1103 : : * Copy existing data of a table from publisher.
1104 : : *
1105 : : * Caller is responsible for locking the local relation.
1106 : : */
1107 : : static void
1108 : 169 : copy_table(Relation rel)
1109 : : {
1110 : : LogicalRepRelMapEntry *relmapentry;
1111 : : LogicalRepRelation lrel;
782 akapila@postgresql.o 1112 : 169 : List *qual = NIL;
1113 : : WalRcvExecResult *res;
1114 : : StringInfoData cmd;
1115 : : CopyFromState cstate;
1116 : : List *attnamelist;
1117 : : ParseState *pstate;
388 1118 : 169 : List *options = NIL;
1119 : :
1120 : : /* Get the publisher relation info. */
2579 peter_e@gmx.net 1121 : 169 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
782 akapila@postgresql.o 1122 : 169 : RelationGetRelationName(rel), &lrel, &qual);
1123 : :
1124 : : /* Put the relation into relmap. */
2579 peter_e@gmx.net 1125 : 168 : logicalrep_relmap_update(&lrel);
1126 : :
1127 : : /* Map the publisher relation to local one. */
1128 : 168 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1129 [ - + ]: 168 : Assert(rel == relmapentry->localrel);
1130 : :
1131 : : /* Start copy on the publisher. */
1132 : 168 : initStringInfo(&cmd);
1133 : :
1134 : : /* Regular table with no row filter */
782 akapila@postgresql.o 1135 [ + + + + ]: 168 : if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1136 : : {
144 1137 : 144 : appendStringInfo(&cmd, "COPY %s",
1487 peter@eisentraut.org 1138 : 144 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1139 : :
1140 : : /* If the table has columns, then specify the columns */
144 akapila@postgresql.o 1141 [ + + ]: 144 : if (lrel.natts)
1142 : : {
1143 : 143 : appendStringInfoString(&cmd, " (");
1144 : :
1145 : : /*
1146 : : * XXX Do we need to list the columns in all cases? Maybe we're
1147 : : * replicating all columns?
1148 : : */
1149 [ + + ]: 376 : for (int i = 0; i < lrel.natts; i++)
1150 : : {
1151 [ + + ]: 233 : if (i > 0)
1152 : 90 : appendStringInfoString(&cmd, ", ");
1153 : :
1154 : 233 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1155 : : }
1156 : :
4 drowley@postgresql.o 1157 :GNC 143 : appendStringInfoChar(&cmd, ')');
1158 : : }
1159 : :
144 akapila@postgresql.o 1160 :CBC 144 : appendStringInfoString(&cmd, " TO STDOUT");
1161 : : }
1162 : : else
1163 : : {
1164 : : /*
1165 : : * For non-tables and tables with row filters, we need to do COPY
1166 : : * (SELECT ...), but we can't just do SELECT * because we need to not
1167 : : * copy generated columns. For tables with any row filters, build a
1168 : : * SELECT query with OR'ed row filters for COPY.
1169 : : */
1277 drowley@postgresql.o 1170 : 24 : appendStringInfoString(&cmd, "COPY (SELECT ");
1487 peter@eisentraut.org 1171 [ + + ]: 60 : for (int i = 0; i < lrel.natts; i++)
1172 : : {
1173 : 36 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1174 [ + + ]: 36 : if (i < lrel.natts - 1)
1175 : 12 : appendStringInfoString(&cmd, ", ");
1176 : : }
1177 : :
782 akapila@postgresql.o 1178 : 24 : appendStringInfoString(&cmd, " FROM ");
1179 : :
1180 : : /*
1181 : : * For regular tables, make sure we don't copy data from a child that
1182 : : * inherits the named table as those will be copied separately.
1183 : : */
1184 [ + + ]: 24 : if (lrel.relkind == RELKIND_RELATION)
1185 : 7 : appendStringInfoString(&cmd, "ONLY ");
1186 : :
1187 : 24 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1188 : : /* list of OR'ed filters */
1189 [ + + ]: 24 : if (qual != NIL)
1190 : : {
1191 : : ListCell *lc;
1192 : 10 : char *q = strVal(linitial(qual));
1193 : :
1194 : 10 : appendStringInfo(&cmd, " WHERE %s", q);
1195 [ + - + + : 11 : for_each_from(lc, qual, 1)
+ + ]
1196 : : {
1197 : 1 : q = strVal(lfirst(lc));
1198 : 1 : appendStringInfo(&cmd, " OR %s", q);
1199 : : }
1200 : 10 : list_free_deep(qual);
1201 : : }
1202 : :
1203 : 24 : appendStringInfoString(&cmd, ") TO STDOUT");
1204 : : }
1205 : :
1206 : : /*
1207 : : * Prior to v16, initial table synchronization will use text format even
1208 : : * if the binary option is enabled for a subscription.
1209 : : */
388 1210 [ + - ]: 168 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1211 [ + + ]: 168 : MySubscription->binary)
1212 : : {
1213 : 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1214 : 5 : options = list_make1(makeDefElem("format",
1215 : : (Node *) makeString("binary"), -1));
1216 : : }
1217 : :
1068 alvherre@alvh.no-ip. 1218 : 168 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
2579 peter_e@gmx.net 1219 : 168 : pfree(cmd.data);
1220 [ - + ]: 168 : if (res->status != WALRCV_OK_COPY_OUT)
2579 peter_e@gmx.net 1221 [ # # ]:UBC 0 : ereport(ERROR,
1222 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1223 : : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1224 : : lrel.nspname, lrel.relname, res->err)));
2579 peter_e@gmx.net 1225 :CBC 168 : walrcv_clear_result(res);
1226 : :
1227 : 168 : copybuf = makeStringInfo();
1228 : :
2554 1229 : 168 : pstate = make_parsestate(NULL);
1564 tgl@sss.pgh.pa.us 1230 : 168 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1231 : : NULL, false, false);
1232 : :
2579 peter_e@gmx.net 1233 : 168 : attnamelist = make_copy_attnamelist(relmapentry);
388 akapila@postgresql.o 1234 : 168 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1235 : :
1236 : : /* Do the copy */
2579 peter_e@gmx.net 1237 : 167 : (void) CopyFrom(cstate);
1238 : :
1239 : 160 : logicalrep_rel_close(relmapentry, NoLock);
1240 : 160 : }
1241 : :
1242 : : /*
1243 : : * Determine the tablesync slot name.
1244 : : *
1245 : : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1246 : : * on slot name length. We append system_identifier to avoid slot_name
1247 : : * collision with subscriptions in other clusters. With the current scheme
1248 : : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1249 : : * length of slot_name will be 50.
1250 : : *
1251 : : * The returned slot name is stored in the supplied buffer (syncslotname) with
1252 : : * the given size.
1253 : : *
1254 : : * Note: We don't use the subscription slot name as part of tablesync slot name
1255 : : * because we are responsible for cleaning up these slots and it could become
1256 : : * impossible to recalculate what name to cleanup if the subscription slot name
1257 : : * had changed.
1258 : : */
1259 : : void
1157 akapila@postgresql.o 1260 : 331 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1261 : : char *syncslotname, Size szslot)
1262 : : {
1154 1263 : 331 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1264 : : relid, GetSystemIdentifier());
1157 1265 : 331 : }
1266 : :
1267 : : /*
1268 : : * Start syncing the table in the sync worker.
1269 : : *
1270 : : * If nothing needs to be done to sync the table, we exit the worker without
1271 : : * any further action.
1272 : : *
1273 : : * The returned slot name is palloc'ed in current memory context.
1274 : : */
1275 : : static char *
2579 peter_e@gmx.net 1276 : 169 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1277 : : {
1278 : : char *slotname;
1279 : : char *err;
1280 : : char relstate;
1281 : : XLogRecPtr relstate_lsn;
1282 : : Relation rel;
1283 : : AclResult aclresult;
1284 : : WalRcvExecResult *res;
1285 : : char originname[NAMEDATALEN];
1286 : : RepOriginId originid;
1287 : : UserContext ucxt;
1288 : : bool must_use_password;
1289 : : bool run_as_owner;
1290 : :
1291 : : /* Check the state of the table synchronization. */
1292 : 169 : StartTransactionCommand();
2551 fujii@postgresql.org 1293 : 169 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1294 : 169 : MyLogicalRepWorker->relid,
1295 : : &relstate_lsn);
180 akapila@postgresql.o 1296 :GNC 169 : CommitTransactionCommand();
1297 : :
1298 : : /* Is the use of a password mandatory? */
304 akapila@postgresql.o 1299 [ + + ]:CBC 334 : must_use_password = MySubscription->passwordrequired &&
180 akapila@postgresql.o 1300 [ - + ]:GNC 165 : !MySubscription->ownersuperuser;
1301 : :
2579 peter_e@gmx.net 1302 [ - + ]:CBC 169 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
2551 fujii@postgresql.org 1303 : 169 : MyLogicalRepWorker->relstate = relstate;
1304 : 169 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
2579 peter_e@gmx.net 1305 : 169 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1306 : :
1307 : : /*
1308 : : * If synchronization is already done or no longer necessary, exit now
1309 : : * that we've updated shared memory state.
1310 : : */
1277 alvherre@alvh.no-ip. 1311 [ - + ]: 169 : switch (relstate)
1312 : : {
1277 alvherre@alvh.no-ip. 1313 :UBC 0 : case SUBREL_STATE_SYNCDONE:
1314 : : case SUBREL_STATE_READY:
1315 : : case SUBREL_STATE_UNKNOWN:
1316 : 0 : finish_sync_worker(); /* doesn't return */
1317 : : }
1318 : :
1319 : : /* Calculate the name of the tablesync slot. */
1154 akapila@postgresql.o 1320 :CBC 169 : slotname = (char *) palloc(NAMEDATALEN);
1321 : 169 : ReplicationSlotNameForTablesync(MySubscription->oid,
1322 : 169 : MyLogicalRepWorker->relid,
1323 : : slotname,
1324 : : NAMEDATALEN);
1325 : :
1326 : : /*
1327 : : * Here we use the slot name instead of the subscription name as the
1328 : : * application_name, so that it is different from the leader apply worker,
1329 : : * so that synchronous replication can distinguish them.
1330 : : */
1068 alvherre@alvh.no-ip. 1331 : 169 : LogRepWorkerWalRcvConn =
69 akapila@postgresql.o 1332 :GNC 169 : walrcv_connect(MySubscription->conninfo, true, true,
1333 : : must_use_password,
1334 : : slotname, &err);
1068 alvherre@alvh.no-ip. 1335 [ - + ]:CBC 169 : if (LogRepWorkerWalRcvConn == NULL)
2579 peter_e@gmx.net 1336 [ # # ]:UBC 0 : ereport(ERROR,
1337 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1338 : : errmsg("could not connect to the publisher: %s", err)));
1339 : :
1277 alvherre@alvh.no-ip. 1340 [ + + - + :CBC 169 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
- - ]
1341 : : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1342 : : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1343 : :
1344 : : /* Assign the origin tracking record name. */
551 akapila@postgresql.o 1345 : 169 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1346 : 169 : MyLogicalRepWorker->relid,
1347 : : originname,
1348 : : sizeof(originname));
1349 : :
1157 1350 [ + + ]: 169 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1351 : : {
1352 : : /*
1353 : : * We have previously errored out before finishing the copy so the
1354 : : * replication slot might exist. We want to remove the slot if it
1355 : : * already exists and proceed.
1356 : : *
1357 : : * XXX We could also instead try to drop the slot, last time we failed
1358 : : * but for that, we might need to clean up the copy state as it might
1359 : : * be in the middle of fetching the rows. Also, if there is a network
1360 : : * breakdown then it wouldn't have succeeded so trying it next time
1361 : : * seems like a better bet.
1362 : : */
1068 alvherre@alvh.no-ip. 1363 : 7 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1364 : : }
1157 akapila@postgresql.o 1365 [ - + ]: 162 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1366 : : {
1367 : : /*
1368 : : * The COPY phase was previously done, but tablesync then crashed
1369 : : * before it was able to finish normally.
1370 : : */
1157 akapila@postgresql.o 1371 :UBC 0 : StartTransactionCommand();
1372 : :
1373 : : /*
1374 : : * The origin tracking name must already exist. It was created first
1375 : : * time this tablesync was launched.
1376 : : */
1377 : 0 : originid = replorigin_by_name(originname, false);
461 1378 : 0 : replorigin_session_setup(originid, 0);
1157 1379 : 0 : replorigin_session_origin = originid;
1380 : 0 : *origin_startpos = replorigin_session_get_progress(false);
1381 : :
1382 : 0 : CommitTransactionCommand();
1383 : :
1384 : 0 : goto copy_table_done;
1385 : : }
1386 : :
1277 alvherre@alvh.no-ip. 1387 [ - + ]:CBC 169 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1388 : 169 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1389 : 169 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1390 : 169 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1391 : :
1392 : : /* Update the state and make it visible to others. */
1393 : 169 : StartTransactionCommand();
1394 : 169 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1395 : 169 : MyLogicalRepWorker->relid,
1396 : 169 : MyLogicalRepWorker->relstate,
1397 : 169 : MyLogicalRepWorker->relstate_lsn);
1398 : 169 : CommitTransactionCommand();
739 andres@anarazel.de 1399 : 169 : pgstat_report_stat(true);
1400 : :
1277 alvherre@alvh.no-ip. 1401 : 169 : StartTransactionCommand();
1402 : :
1403 : : /*
1404 : : * Use a standard write lock here. It might be better to disallow access
1405 : : * to the table while it's being synchronized. But we don't want to block
1406 : : * the main apply process from working and it has to open the relation in
1407 : : * RowExclusiveLock when remapping remote relation id to local one.
1408 : : */
1409 : 169 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1410 : :
1411 : : /*
1412 : : * Start a transaction in the remote node in REPEATABLE READ mode. This
1413 : : * ensures that both the replication slot we create (see below) and the
1414 : : * COPY are consistent with each other.
1415 : : */
1068 1416 : 169 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1417 : : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1418 : : 0, NULL);
1277 1419 [ - + ]: 169 : if (res->status != WALRCV_OK_COMMAND)
1277 alvherre@alvh.no-ip. 1420 [ # # ]:UBC 0 : ereport(ERROR,
1421 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1422 : : errmsg("table copy could not start transaction on publisher: %s",
1423 : : res->err)));
1277 alvherre@alvh.no-ip. 1424 :CBC 169 : walrcv_clear_result(res);
1425 : :
1426 : : /*
1427 : : * Create a new permanent logical decoding slot. This slot will be used
1428 : : * for the catchup phase after COPY is done, so tell it to use the
1429 : : * snapshot to make the final data consistent.
1430 : : */
1005 akapila@postgresql.o 1431 : 169 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1432 : : slotname, false /* permanent */ , false /* two_phase */ ,
1433 : : MySubscription->failover,
1434 : : CRS_USE_SNAPSHOT, origin_startpos);
1435 : :
1436 : : /*
1437 : : * Setup replication origin tracking. The purpose of doing this before the
1438 : : * copy is to avoid doing the copy again due to any error in setting up
1439 : : * origin tracking.
1440 : : */
1157 1441 : 169 : originid = replorigin_by_name(originname, true);
1442 [ + - ]: 169 : if (!OidIsValid(originid))
1443 : : {
1444 : : /*
1445 : : * Origin tracking does not exist, so create it now.
1446 : : *
1447 : : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1448 : : * logged for the purpose of recovery. Locks are to prevent the
1449 : : * replication origin from vanishing while advancing.
1450 : : */
1451 : 169 : originid = replorigin_create(originname);
1452 : :
1453 : 169 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1454 : 169 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1455 : : true /* go backward */ , true /* WAL log */ );
1456 : 169 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1457 : :
461 1458 : 169 : replorigin_session_setup(originid, 0);
1157 1459 : 169 : replorigin_session_origin = originid;
1460 : : }
1461 : : else
1462 : : {
1157 akapila@postgresql.o 1463 [ # # ]:UBC 0 : ereport(ERROR,
1464 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1465 : : errmsg("replication origin \"%s\" already exists",
1466 : : originname)));
1467 : : }
1468 : :
1469 : : /*
1470 : : * Make sure that the copy command runs as the table owner, unless the
1471 : : * user has opted out of that behaviour.
1472 : : */
310 msawada@postgresql.o 1473 :CBC 169 : run_as_owner = MySubscription->runasowner;
1474 [ + + ]: 169 : if (!run_as_owner)
1475 : 168 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1476 : :
1477 : : /*
1478 : : * Check that our table sync worker has permission to insert into the
1479 : : * target table.
1480 : : */
1481 : 169 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1482 : : ACL_INSERT);
1483 [ - + ]: 169 : if (aclresult != ACLCHECK_OK)
310 msawada@postgresql.o 1484 :UBC 0 : aclcheck_error(aclresult,
1485 : 0 : get_relkind_objtype(rel->rd_rel->relkind),
1486 : 0 : RelationGetRelationName(rel));
1487 : :
1488 : : /*
1489 : : * COPY FROM does not honor RLS policies. That is not a problem for
1490 : : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1491 : : * who has it implicitly), but other roles should not be able to
1492 : : * circumvent RLS. Disallow logical replication into RLS enabled
1493 : : * relations for such roles.
1494 : : */
310 msawada@postgresql.o 1495 [ - + ]:CBC 169 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
310 msawada@postgresql.o 1496 [ # # ]:UBC 0 : ereport(ERROR,
1497 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1498 : : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1499 : : GetUserNameFromId(GetUserId(), true),
1500 : : RelationGetRelationName(rel))));
1501 : :
1502 : : /* Now do the initial data copy */
738 tomas.vondra@postgre 1503 :CBC 169 : PushActiveSnapshot(GetTransactionSnapshot());
1504 : 169 : copy_table(rel);
1505 : 160 : PopActiveSnapshot();
1506 : :
1068 alvherre@alvh.no-ip. 1507 : 160 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1277 1508 [ - + ]: 160 : if (res->status != WALRCV_OK_COMMAND)
1277 alvherre@alvh.no-ip. 1509 [ # # ]:UBC 0 : ereport(ERROR,
1510 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1511 : : errmsg("table copy could not finish transaction on publisher: %s",
1512 : : res->err)));
1277 alvherre@alvh.no-ip. 1513 :CBC 160 : walrcv_clear_result(res);
1514 : :
299 tgl@sss.pgh.pa.us 1515 [ + + ]: 160 : if (!run_as_owner)
310 msawada@postgresql.o 1516 : 159 : RestoreUserContext(&ucxt);
1517 : :
1277 alvherre@alvh.no-ip. 1518 : 160 : table_close(rel, NoLock);
1519 : :
1520 : : /* Make the copy visible. */
1521 : 160 : CommandCounterIncrement();
1522 : :
1523 : : /*
1524 : : * Update the persisted state to indicate the COPY phase is done; make it
1525 : : * visible to others.
1526 : : */
1157 akapila@postgresql.o 1527 : 160 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1528 : 160 : MyLogicalRepWorker->relid,
1529 : : SUBREL_STATE_FINISHEDCOPY,
1530 : 160 : MyLogicalRepWorker->relstate_lsn);
1531 : :
1532 : 160 : CommitTransactionCommand();
1533 : :
1534 : 160 : copy_table_done:
1535 : :
1536 [ - + ]: 160 : elog(DEBUG1,
1537 : : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1538 : : originname, LSN_FORMAT_ARGS(*origin_startpos));
1539 : :
1540 : : /*
1541 : : * We are done with the initial data synchronization, update the state.
1542 : : */
1277 alvherre@alvh.no-ip. 1543 [ - + ]: 160 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1544 : 160 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1545 : 160 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1546 : 160 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1547 : :
1548 : : /*
1549 : : * Finally, wait until the leader apply worker tells us to catch up and
1550 : : * then return to let LogicalRepApplyLoop do it.
1551 : : */
1552 : 160 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
2579 peter_e@gmx.net 1553 : 160 : return slotname;
1554 : : }
1555 : :
1556 : : /*
1557 : : * Common code to fetch the up-to-date sync state info into the static lists.
1558 : : *
1559 : : * Returns true if subscription has 1 or more tables, else false.
1560 : : *
1561 : : * Note: If this function started the transaction (indicated by the parameter)
1562 : : * then it is the caller's responsibility to commit it.
1563 : : */
1564 : : static bool
1005 akapila@postgresql.o 1565 : 4456 : FetchTableStates(bool *started_tx)
1566 : : {
1567 : : static bool has_subrels = false;
1568 : :
1569 : 4456 : *started_tx = false;
1570 : :
1571 [ + + ]: 4456 : if (!table_states_valid)
1572 : : {
1573 : : MemoryContext oldctx;
1574 : : List *rstates;
1575 : : ListCell *lc;
1576 : : SubscriptionRelState *rstate;
1577 : :
1578 : : /* Clean the old lists. */
1579 : 736 : list_free_deep(table_states_not_ready);
1580 : 736 : table_states_not_ready = NIL;
1581 : :
1582 [ + + ]: 736 : if (!IsTransactionState())
1583 : : {
1584 : 720 : StartTransactionCommand();
1585 : 720 : *started_tx = true;
1586 : : }
1587 : :
1588 : : /* Fetch all non-ready tables. */
627 michael@paquier.xyz 1589 : 736 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1590 : :
1591 : : /* Allocate the tracking info in a permanent memory context. */
1005 akapila@postgresql.o 1592 : 736 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1593 [ + + + + : 1946 : foreach(lc, rstates)
+ + ]
1594 : : {
1595 : 1210 : rstate = palloc(sizeof(SubscriptionRelState));
1596 : 1210 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1597 : 1210 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1598 : : }
1599 : 736 : MemoryContextSwitchTo(oldctx);
1600 : :
1601 : : /*
1602 : : * Does the subscription have tables?
1603 : : *
1604 : : * If there were not-READY relations found then we know it does. But
1605 : : * if table_states_not_ready was empty we still need to check again to
1606 : : * see if there are 0 tables.
1607 : : */
606 tgl@sss.pgh.pa.us 1608 [ + + + + ]: 941 : has_subrels = (table_states_not_ready != NIL) ||
1005 akapila@postgresql.o 1609 : 205 : HasSubscriptionRelations(MySubscription->oid);
1610 : :
1611 : 736 : table_states_valid = true;
1612 : : }
1613 : :
1614 : 4456 : return has_subrels;
1615 : : }
1616 : :
1617 : : /*
1618 : : * Execute the initial sync with error handling. Disable the subscription,
1619 : : * if it's required.
1620 : : *
1621 : : * Allocate the slot name in long-lived context on return. Note that we don't
1622 : : * handle FATAL errors which are probably because of system resource error and
1623 : : * are not repeatable.
1624 : : */
1625 : : static void
255 akapila@postgresql.o 1626 :GNC 169 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1627 : : {
1628 : 169 : char *sync_slotname = NULL;
1629 : :
1630 [ - + ]: 169 : Assert(am_tablesync_worker());
1631 : :
1632 [ + + ]: 169 : PG_TRY();
1633 : : {
1634 : : /* Call initial sync. */
1635 : 169 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1636 : : }
1637 : 8 : PG_CATCH();
1638 : : {
1639 [ + + ]: 8 : if (MySubscription->disableonerr)
1640 : 1 : DisableSubscriptionAndExit();
1641 : : else
1642 : : {
1643 : : /*
1644 : : * Report the worker failed during table synchronization. Abort
1645 : : * the current transaction so that the stats message is sent in an
1646 : : * idle state.
1647 : : */
1648 : 7 : AbortOutOfAnyTransaction();
1649 : 7 : pgstat_report_subscription_error(MySubscription->oid, false);
1650 : :
1651 : 7 : PG_RE_THROW();
1652 : : }
1653 : : }
1654 [ - + ]: 160 : PG_END_TRY();
1655 : :
1656 : : /* allocate slot name in long-lived context */
1657 : 160 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1658 : 160 : pfree(sync_slotname);
1659 : 160 : }
1660 : :
1661 : : /*
1662 : : * Runs the tablesync worker.
1663 : : *
1664 : : * It starts syncing tables. After a successful sync, sets streaming options
1665 : : * and starts streaming to catchup with apply worker.
1666 : : */
1667 : : static void
1668 : 169 : run_tablesync_worker()
1669 : : {
1670 : : char originname[NAMEDATALEN];
1671 : 169 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1672 : 169 : char *slotname = NULL;
1673 : : WalRcvStreamOptions options;
1674 : :
1675 : 169 : start_table_sync(&origin_startpos, &slotname);
1676 : :
1677 : 160 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1678 : 160 : MyLogicalRepWorker->relid,
1679 : : originname,
1680 : : sizeof(originname));
1681 : :
1682 : 160 : set_apply_error_context_origin(originname);
1683 : :
1684 : 160 : set_stream_options(&options, slotname, &origin_startpos);
1685 : :
1686 : 160 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1687 : :
1688 : : /* Apply the changes till we catchup with the apply worker. */
1689 : 160 : start_apply(origin_startpos);
255 akapila@postgresql.o 1690 :UNC 0 : }
1691 : :
1692 : : /* Logical Replication Tablesync worker entry point */
1693 : : void
255 akapila@postgresql.o 1694 :GNC 169 : TablesyncWorkerMain(Datum main_arg)
1695 : : {
1696 : 169 : int worker_slot = DatumGetInt32(main_arg);
1697 : :
1698 : 169 : SetupApplyOrSyncWorker(worker_slot);
1699 : :
1700 : 169 : run_tablesync_worker();
1701 : :
255 akapila@postgresql.o 1702 :UNC 0 : finish_sync_worker();
1703 : : }
1704 : :
1705 : : /*
1706 : : * If the subscription has no tables then return false.
1707 : : *
1708 : : * Otherwise, are all tablesyncs READY?
1709 : : *
1710 : : * Note: This function is not suitable to be called from outside of apply or
1711 : : * tablesync workers because MySubscription needs to be already initialized.
1712 : : */
1713 : : bool
1005 akapila@postgresql.o 1714 :CBC 68 : AllTablesyncsReady(void)
1715 : : {
1716 : 68 : bool started_tx = false;
1717 : 68 : bool has_subrels = false;
1718 : :
1719 : : /* We need up-to-date sync state info for subscription tables here. */
1720 : 68 : has_subrels = FetchTableStates(&started_tx);
1721 : :
1722 [ + + ]: 68 : if (started_tx)
1723 : : {
1724 : 13 : CommitTransactionCommand();
739 andres@anarazel.de 1725 : 13 : pgstat_report_stat(true);
1726 : : }
1727 : :
1728 : : /*
1729 : : * Return false when there are no tables in subscription or not all tables
1730 : : * are in ready state; true otherwise.
1731 : : */
606 tgl@sss.pgh.pa.us 1732 [ + - + + ]: 68 : return has_subrels && (table_states_not_ready == NIL);
1733 : : }
1734 : :
1735 : : /*
1736 : : * Update the two_phase state of the specified subscription in pg_subscription.
1737 : : */
1738 : : void
1005 akapila@postgresql.o 1739 : 7 : UpdateTwoPhaseState(Oid suboid, char new_state)
1740 : : {
1741 : : Relation rel;
1742 : : HeapTuple tup;
1743 : : bool nulls[Natts_pg_subscription];
1744 : : bool replaces[Natts_pg_subscription];
1745 : : Datum values[Natts_pg_subscription];
1746 : :
1747 [ + - + - : 7 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
- + ]
1748 : : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1749 : : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1750 : :
1751 : 7 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1752 : 7 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1753 [ - + ]: 7 : if (!HeapTupleIsValid(tup))
1005 akapila@postgresql.o 1754 [ # # ]:UBC 0 : elog(ERROR,
1755 : : "cache lookup failed for subscription oid %u",
1756 : : suboid);
1757 : :
1758 : : /* Form a new tuple. */
1005 akapila@postgresql.o 1759 :CBC 7 : memset(values, 0, sizeof(values));
1760 : 7 : memset(nulls, false, sizeof(nulls));
1761 : 7 : memset(replaces, false, sizeof(replaces));
1762 : :
1763 : : /* And update/set two_phase state */
1764 : 7 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1765 : 7 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1766 : :
1767 : 7 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1768 : : values, nulls, replaces);
1769 : 7 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1770 : :
1771 : 7 : heap_freetuple(tup);
1772 : 7 : table_close(rel, RowExclusiveLock);
1773 : 7 : }
|