Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * launcher.c
3 : : * PostgreSQL logical replication worker launcher process
4 : : *
5 : : * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/launcher.c
9 : : *
10 : : * NOTES
11 : : * This module contains the logical replication worker launcher which
12 : : * uses the background worker infrastructure to start the logical
13 : : * replication workers for every enabled subscription.
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : :
18 : : #include "postgres.h"
19 : :
20 : : #include "access/heapam.h"
21 : : #include "access/htup.h"
22 : : #include "access/htup_details.h"
23 : : #include "access/tableam.h"
24 : : #include "access/xact.h"
25 : : #include "catalog/pg_subscription.h"
26 : : #include "catalog/pg_subscription_rel.h"
27 : : #include "funcapi.h"
28 : : #include "lib/dshash.h"
29 : : #include "miscadmin.h"
30 : : #include "pgstat.h"
31 : : #include "postmaster/bgworker.h"
32 : : #include "postmaster/interrupt.h"
33 : : #include "replication/logicallauncher.h"
34 : : #include "replication/slot.h"
35 : : #include "replication/walreceiver.h"
36 : : #include "replication/worker_internal.h"
37 : : #include "storage/ipc.h"
38 : : #include "storage/proc.h"
39 : : #include "storage/procarray.h"
40 : : #include "tcop/tcopprot.h"
41 : : #include "utils/builtins.h"
42 : : #include "utils/memutils.h"
43 : : #include "utils/pg_lsn.h"
44 : : #include "utils/snapmgr.h"
45 : :
46 : : /* max sleep time between cycles (3min) */
47 : : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
48 : :
49 : : /* GUC variables */
50 : : int max_logical_replication_workers = 4;
51 : : int max_sync_workers_per_subscription = 2;
52 : : int max_parallel_apply_workers_per_subscription = 2;
53 : :
54 : : LogicalRepWorker *MyLogicalRepWorker = NULL;
55 : :
56 : : typedef struct LogicalRepCtxStruct
57 : : {
58 : : /* Supervisor process. */
59 : : pid_t launcher_pid;
60 : :
61 : : /* Hash table holding last start times of subscriptions' apply workers. */
62 : : dsa_handle last_start_dsa;
63 : : dshash_table_handle last_start_dsh;
64 : :
65 : : /* Background workers. */
66 : : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
67 : : } LogicalRepCtxStruct;
68 : :
69 : : static LogicalRepCtxStruct *LogicalRepCtx;
70 : :
71 : : /* an entry in the last-start-times shared hash table */
72 : : typedef struct LauncherLastStartTimesEntry
73 : : {
74 : : Oid subid; /* OID of logrep subscription (hash key) */
75 : : TimestampTz last_start_time; /* last time its apply worker was started */
76 : : } LauncherLastStartTimesEntry;
77 : :
78 : : /* parameters for the last-start-times shared hash table */
79 : : static const dshash_parameters dsh_params = {
80 : : sizeof(Oid),
81 : : sizeof(LauncherLastStartTimesEntry),
82 : : dshash_memcmp,
83 : : dshash_memhash,
84 : : dshash_memcpy,
85 : : LWTRANCHE_LAUNCHER_HASH
86 : : };
87 : :
88 : : static dsa_area *last_start_times_dsa = NULL;
89 : : static dshash_table *last_start_times = NULL;
90 : :
91 : : static bool on_commit_launcher_wakeup = false;
92 : :
93 : :
94 : : static void ApplyLauncherWakeup(void);
95 : : static void logicalrep_launcher_onexit(int code, Datum arg);
96 : : static void logicalrep_worker_onexit(int code, Datum arg);
97 : : static void logicalrep_worker_detach(void);
98 : : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 : : static int logicalrep_pa_worker_count(Oid subid);
100 : : static void logicalrep_launcher_attach_dshmem(void);
101 : : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
102 : : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
103 : :
104 : :
105 : : /*
106 : : * Load the list of subscriptions.
107 : : *
108 : : * Only the fields interesting for worker start/stop functions are filled for
109 : : * each subscription.
110 : : */
111 : : static List *
2642 peter_e@gmx.net 112 :CBC 3124 : get_subscription_list(void)
113 : : {
114 : 3124 : List *res = NIL;
115 : : Relation rel;
116 : : TableScanDesc scan;
117 : : HeapTuple tup;
118 : : MemoryContext resultcxt;
119 : :
120 : : /* This is the context that we will allocate our output data in */
121 : 3124 : resultcxt = CurrentMemoryContext;
122 : :
123 : : /*
124 : : * Start a transaction so we can access pg_database, and get a snapshot.
125 : : * We don't have a use for the snapshot itself, but we're interested in
126 : : * the secondary effect that it sets RecentGlobalXmin. (This is critical
127 : : * for anything that reads heap pages, because HOT may decide to prune
128 : : * them even if the process doesn't attempt to modify any tuples.)
129 : : *
130 : : * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
131 : : * not pushed/active does not reliably prevent HOT pruning (->xmin could
132 : : * e.g. be cleared when cache invalidations are processed).
133 : : */
134 : 3124 : StartTransactionCommand();
135 : 3124 : (void) GetTransactionSnapshot();
136 : :
1910 andres@anarazel.de 137 : 3124 : rel = table_open(SubscriptionRelationId, AccessShareLock);
1861 138 : 3124 : scan = table_beginscan_catalog(rel, 0, NULL);
139 : :
2642 peter_e@gmx.net 140 [ + + ]: 4139 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
141 : : {
142 : 1015 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
143 : : Subscription *sub;
144 : : MemoryContext oldcxt;
145 : :
146 : : /*
147 : : * Allocate our results in the caller's context, not the
148 : : * transaction's. We do this inside the loop, and restore the original
149 : : * context at the end, so that leaky things like heap_getnext() are
150 : : * not called in a potentially long-lived context.
151 : : */
152 : 1015 : oldcxt = MemoryContextSwitchTo(resultcxt);
153 : :
2557 154 : 1015 : sub = (Subscription *) palloc0(sizeof(Subscription));
1972 andres@anarazel.de 155 : 1015 : sub->oid = subform->oid;
2642 peter_e@gmx.net 156 : 1015 : sub->dbid = subform->subdbid;
157 : 1015 : sub->owner = subform->subowner;
158 : 1015 : sub->enabled = subform->subenabled;
159 : 1015 : sub->name = pstrdup(NameStr(subform->subname));
160 : : /* We don't fill fields we are not interested in. */
161 : :
162 : 1015 : res = lappend(res, sub);
163 : 1015 : MemoryContextSwitchTo(oldcxt);
164 : : }
165 : :
1861 andres@anarazel.de 166 : 3124 : table_endscan(scan);
1910 167 : 3124 : table_close(rel, AccessShareLock);
168 : :
2642 peter_e@gmx.net 169 : 3124 : CommitTransactionCommand();
170 : :
171 : 3124 : return res;
172 : : }
173 : :
174 : : /*
175 : : * Wait for a background worker to start up and attach to the shmem context.
176 : : *
177 : : * This is only needed for cleaning up the shared memory in case the worker
178 : : * fails to attach.
179 : : *
180 : : * Returns whether the attach was successful.
181 : : */
182 : : static bool
183 : 455 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
184 : : uint16 generation,
185 : : BackgroundWorkerHandle *handle)
186 : : {
187 : : BgwHandleStatus status;
188 : : int rc;
189 : :
190 : : for (;;)
191 : 956 : {
192 : : pid_t pid;
193 : :
194 [ - + ]: 1411 : CHECK_FOR_INTERRUPTS();
195 : :
2545 196 : 1411 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 : :
198 : : /* Worker either died or has started. Return false if died. */
199 [ + + + + ]: 1411 : if (!worker->in_use || worker->proc)
200 : : {
201 : 453 : LWLockRelease(LogicalRepWorkerLock);
461 akapila@postgresql.o 202 : 453 : return worker->in_use;
203 : : }
204 : :
2545 peter_e@gmx.net 205 : 958 : LWLockRelease(LogicalRepWorkerLock);
206 : :
207 : : /* Check if worker has died before attaching, and clean up after it. */
2642 208 : 958 : status = GetBackgroundWorkerPid(handle, &pid);
209 : :
210 [ - + ]: 958 : if (status == BGWH_STOPPED)
211 : : {
2545 peter_e@gmx.net 212 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
213 : : /* Ensure that this was indeed the worker we waited for. */
214 [ # # ]: 0 : if (generation == worker->generation)
215 : 0 : logicalrep_worker_cleanup(worker);
216 : 0 : LWLockRelease(LogicalRepWorkerLock);
461 akapila@postgresql.o 217 : 0 : return false;
218 : : }
219 : :
220 : : /*
221 : : * We need timeout because we generally don't get notified via latch
222 : : * about the worker attach. But we don't expect to have to wait long.
223 : : */
2642 peter_e@gmx.net 224 :CBC 958 : rc = WaitLatch(MyLatch,
225 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
226 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
227 : :
2504 andres@anarazel.de 228 [ + + ]: 956 : if (rc & WL_LATCH_SET)
229 : : {
230 : 534 : ResetLatch(MyLatch);
231 [ - + ]: 534 : CHECK_FOR_INTERRUPTS();
232 : : }
233 : : }
234 : : }
235 : :
236 : : /*
237 : : * Walks the workers array and searches for one that matches given
238 : : * subscription id and relid.
239 : : *
240 : : * We are only interested in the leader apply worker or table sync worker.
241 : : */
242 : : LogicalRepWorker *
2579 peter_e@gmx.net 243 : 2818 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
244 : : {
245 : : int i;
2524 bruce@momjian.us 246 : 2818 : LogicalRepWorker *res = NULL;
247 : :
2642 peter_e@gmx.net 248 [ - + ]: 2818 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
249 : :
250 : : /* Search for attached worker for a given subscription id. */
251 [ + + ]: 10059 : for (i = 0; i < max_logical_replication_workers; i++)
252 : : {
2524 bruce@momjian.us 253 : 8719 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
254 : :
255 : : /* Skip parallel apply workers. */
461 akapila@postgresql.o 256 [ + + - + ]: 8719 : if (isParallelApplyWorker(w))
461 akapila@postgresql.o 257 :LBC (1434) : continue;
258 : :
2545 peter_e@gmx.net 259 [ + + + + :CBC 8719 : if (w->in_use && w->subid == subid && w->relid == relid &&
+ + ]
260 [ + + + - ]: 1478 : (!only_running || w->proc))
261 : : {
2642 262 : 1478 : res = w;
263 : 1478 : break;
264 : : }
265 : : }
266 : :
267 : 2818 : return res;
268 : : }
269 : :
270 : : /*
271 : : * Similar to logicalrep_worker_find(), but returns a list of all workers for
272 : : * the subscription, instead of just one.
273 : : */
274 : : List *
2445 275 : 524 : logicalrep_workers_find(Oid subid, bool only_running)
276 : : {
277 : : int i;
278 : 524 : List *res = NIL;
279 : :
280 [ - + ]: 524 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
281 : :
282 : : /* Search for attached worker for a given subscription id. */
283 [ + + ]: 2820 : for (i = 0; i < max_logical_replication_workers; i++)
284 : : {
285 : 2296 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
286 : :
287 [ + + + + : 2296 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ + + + ]
288 : 372 : res = lappend(res, w);
289 : : }
290 : :
291 : 524 : return res;
292 : : }
293 : :
294 : : /*
295 : : * Start new logical replication background worker, if possible.
296 : : *
297 : : * Returns true on success, false on failure.
298 : : */
299 : : bool
244 akapila@postgresql.o 300 :GNC 459 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
301 : : Oid dbid, Oid subid, const char *subname, Oid userid,
302 : : Oid relid, dsm_handle subworker_dsm)
303 : : {
304 : : BackgroundWorker bgw;
305 : : BackgroundWorkerHandle *bgw_handle;
306 : : uint16 generation;
307 : : int i;
2524 bruce@momjian.us 308 :CBC 459 : int slot = 0;
309 : 459 : LogicalRepWorker *worker = NULL;
310 : : int nsyncworkers;
311 : : int nparallelapplyworkers;
312 : : TimestampTz now;
244 akapila@postgresql.o 313 :GNC 459 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
314 : 459 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
315 : :
316 : : /*----------
317 : : * Sanity checks:
318 : : * - must be valid worker type
319 : : * - tablesync workers are only ones to have relid
320 : : * - parallel apply worker is the only kind of subworker
321 : : */
322 [ - + ]: 459 : Assert(wtype != WORKERTYPE_UNKNOWN);
323 [ - + ]: 459 : Assert(is_tablesync_worker == OidIsValid(relid));
324 [ - + ]: 459 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
325 : :
2517 peter_e@gmx.net 326 [ + + ]:CBC 459 : ereport(DEBUG1,
327 : : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
328 : : subname)));
329 : :
330 : : /* Report this after the initial starting message for consistency. */
2642 331 [ - + ]: 459 : if (max_replication_slots == 0)
2642 peter_e@gmx.net 332 [ # # ]:UBC 0 : ereport(ERROR,
333 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
334 : : errmsg("cannot start logical replication workers when max_replication_slots = 0")));
335 : :
336 : : /*
337 : : * We need to do the modification of the shared memory under lock so that
338 : : * we have consistent view.
339 : : */
2642 peter_e@gmx.net 340 :CBC 459 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
341 : :
2545 342 : 459 : retry:
343 : : /* Find unused worker slot. */
344 [ + - ]: 753 : for (i = 0; i < max_logical_replication_workers; i++)
345 : : {
346 : 753 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
347 : :
348 [ + + ]: 753 : if (!w->in_use)
349 : : {
350 : 459 : worker = w;
351 : 459 : slot = i;
2642 352 : 459 : break;
353 : : }
354 : : }
355 : :
2545 356 : 459 : nsyncworkers = logicalrep_sync_worker_count(subid);
357 : :
358 : 459 : now = GetCurrentTimestamp();
359 : :
360 : : /*
361 : : * If we didn't find a free slot, try to do garbage collection. The
362 : : * reason we do this is because if some worker failed to start up and its
363 : : * parent has crashed while waiting, the in_use state was never cleared.
364 : : */
365 [ + - - + ]: 459 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
366 : : {
2524 bruce@momjian.us 367 :UBC 0 : bool did_cleanup = false;
368 : :
2545 peter_e@gmx.net 369 [ # # ]: 0 : for (i = 0; i < max_logical_replication_workers; i++)
370 : : {
371 : 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
372 : :
373 : : /*
374 : : * If the worker was marked in use but didn't manage to attach in
375 : : * time, clean it up.
376 : : */
377 [ # # # # : 0 : if (w->in_use && !w->proc &&
# # ]
378 : 0 : TimestampDifferenceExceeds(w->launch_time, now,
379 : : wal_receiver_timeout))
380 : : {
381 [ # # ]: 0 : elog(WARNING,
382 : : "logical replication worker for subscription %u took too long to start; canceled",
383 : : w->subid);
384 : :
385 : 0 : logicalrep_worker_cleanup(w);
386 : 0 : did_cleanup = true;
387 : : }
388 : : }
389 : :
390 [ # # ]: 0 : if (did_cleanup)
391 : 0 : goto retry;
392 : : }
393 : :
394 : : /*
395 : : * We don't allow to invoke more sync workers once we have reached the
396 : : * sync worker limit per subscription. So, just return silently as we
397 : : * might get here because of an otherwise harmless race condition.
398 : : */
244 akapila@postgresql.o 399 [ + + - + ]:GNC 459 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
400 : : {
2545 peter_e@gmx.net 401 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
461 akapila@postgresql.o 402 : 0 : return false;
403 : : }
404 : :
461 akapila@postgresql.o 405 :CBC 459 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
406 : :
407 : : /*
408 : : * Return false if the number of parallel apply workers reached the limit
409 : : * per subscription.
410 : : */
411 [ + + ]: 459 : if (is_parallel_apply_worker &&
412 [ - + ]: 11 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
413 : : {
461 akapila@postgresql.o 414 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
415 : 0 : return false;
416 : : }
417 : :
418 : : /*
419 : : * However if there are no more free worker slots, inform user about it
420 : : * before exiting.
421 : : */
2642 peter_e@gmx.net 422 [ - + ]:CBC 459 : if (worker == NULL)
423 : : {
2637 fujii@postgresql.org 424 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
2642 peter_e@gmx.net 425 [ # # ]: 0 : ereport(WARNING,
426 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
427 : : errmsg("out of logical replication worker slots"),
428 : : errhint("You might need to increase %s.", "max_logical_replication_workers")));
461 akapila@postgresql.o 429 : 0 : return false;
430 : : }
431 : :
432 : : /* Prepare the worker slot. */
244 akapila@postgresql.o 433 :GNC 459 : worker->type = wtype;
2545 peter_e@gmx.net 434 :CBC 459 : worker->launch_time = now;
435 : 459 : worker->in_use = true;
436 : 459 : worker->generation++;
2579 437 : 459 : worker->proc = NULL;
2642 438 : 459 : worker->dbid = dbid;
439 : 459 : worker->userid = userid;
440 : 459 : worker->subid = subid;
2579 441 : 459 : worker->relid = relid;
442 : 459 : worker->relstate = SUBREL_STATE_UNKNOWN;
443 : 459 : worker->relstate_lsn = InvalidXLogRecPtr;
955 akapila@postgresql.o 444 : 459 : worker->stream_fileset = NULL;
452 445 [ + + ]: 459 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
461 446 : 459 : worker->parallel_apply = is_parallel_apply_worker;
2579 peter_e@gmx.net 447 : 459 : worker->last_lsn = InvalidXLogRecPtr;
448 : 459 : TIMESTAMP_NOBEGIN(worker->last_send_time);
449 : 459 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
450 : 459 : worker->reply_lsn = InvalidXLogRecPtr;
451 : 459 : TIMESTAMP_NOBEGIN(worker->reply_time);
452 : :
453 : : /* Before releasing lock, remember generation for future identification. */
2400 tgl@sss.pgh.pa.us 454 : 459 : generation = worker->generation;
455 : :
2642 peter_e@gmx.net 456 : 459 : LWLockRelease(LogicalRepWorkerLock);
457 : :
458 : : /* Register the new dynamic worker. */
2555 tgl@sss.pgh.pa.us 459 : 459 : memset(&bgw, 0, sizeof(bgw));
2524 bruce@momjian.us 460 : 459 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
461 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
2642 peter_e@gmx.net 462 : 459 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
286 nathan@postgresql.or 463 :GNC 459 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
464 : :
236 akapila@postgresql.o 465 [ + + + - : 459 : switch (worker->type)
- ]
466 : : {
467 : 276 : case WORKERTYPE_APPLY:
468 : 276 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
469 : 276 : snprintf(bgw.bgw_name, BGW_MAXLEN,
470 : : "logical replication apply worker for subscription %u",
471 : : subid);
472 : 276 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
473 : 276 : break;
474 : :
475 : 11 : case WORKERTYPE_PARALLEL_APPLY:
476 : 11 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
477 : 11 : snprintf(bgw.bgw_name, BGW_MAXLEN,
478 : : "logical replication parallel apply worker for subscription %u",
479 : : subid);
480 : 11 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
481 : :
482 : 11 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
483 : 11 : break;
484 : :
485 : 172 : case WORKERTYPE_TABLESYNC:
486 : 172 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
487 : 172 : snprintf(bgw.bgw_name, BGW_MAXLEN,
488 : : "logical replication tablesync worker for subscription %u sync %u",
489 : : subid,
490 : : relid);
491 : 172 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
492 : 172 : break;
493 : :
236 akapila@postgresql.o 494 :UNC 0 : case WORKERTYPE_UNKNOWN:
495 : : /* Should never happen. */
496 [ # # ]: 0 : elog(ERROR, "unknown worker type");
497 : : }
498 : :
2642 peter_e@gmx.net 499 :CBC 459 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
500 : 459 : bgw.bgw_notify_pid = MyProcPid;
2552 fujii@postgresql.org 501 : 459 : bgw.bgw_main_arg = Int32GetDatum(slot);
502 : :
2642 peter_e@gmx.net 503 [ + + ]: 459 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
504 : : {
505 : : /* Failed to start worker, so clean up the worker slot. */
2400 tgl@sss.pgh.pa.us 506 : 4 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
507 [ - + ]: 4 : Assert(generation == worker->generation);
508 : 4 : logicalrep_worker_cleanup(worker);
509 : 4 : LWLockRelease(LogicalRepWorkerLock);
510 : :
2642 peter_e@gmx.net 511 [ + - ]: 4 : ereport(WARNING,
512 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
513 : : errmsg("out of background worker slots"),
514 : : errhint("You might need to increase %s.", "max_worker_processes")));
461 akapila@postgresql.o 515 : 4 : return false;
516 : : }
517 : :
518 : : /* Now wait until it attaches. */
519 : 455 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
520 : : }
521 : :
522 : : /*
523 : : * Internal function to stop the worker and wait until it detaches from the
524 : : * slot.
525 : : */
526 : : static void
527 : 60 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
528 : : {
529 : : uint16 generation;
530 : :
531 [ - + ]: 60 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
532 : :
533 : : /*
534 : : * Remember which generation was our worker so we can check if what we see
535 : : * is still the same one.
536 : : */
2545 peter_e@gmx.net 537 : 60 : generation = worker->generation;
538 : :
539 : : /*
540 : : * If we found a worker but it does not have proc set then it is still
541 : : * starting up; wait for it to finish starting and then kill it.
542 : : */
543 [ + - - + ]: 60 : while (worker->in_use && !worker->proc)
544 : : {
545 : : int rc;
546 : :
2642 peter_e@gmx.net 547 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
548 : :
549 : : /* Wait a bit --- we don't expect to have to wait long. */
2504 andres@anarazel.de 550 : 0 : rc = WaitLatch(MyLatch,
551 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
552 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
553 : :
554 [ # # ]: 0 : if (rc & WL_LATCH_SET)
555 : : {
556 : 0 : ResetLatch(MyLatch);
557 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
558 : : }
559 : :
560 : : /* Recheck worker status. */
2642 peter_e@gmx.net 561 : 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
562 : :
563 : : /*
564 : : * Check whether the worker slot is no longer used, which would mean
565 : : * that the worker has exited, or whether the worker generation is
566 : : * different, meaning that a different worker has taken the slot.
567 : : */
2545 568 [ # # # # ]: 0 : if (!worker->in_use || worker->generation != generation)
2638 569 : 0 : return;
570 : :
571 : : /* Worker has assigned proc, so it has started. */
572 [ # # ]: 0 : if (worker->proc)
2642 573 : 0 : break;
574 : : }
575 : :
576 : : /* Now terminate the worker ... */
461 akapila@postgresql.o 577 :CBC 60 : kill(worker->proc->pid, signo);
578 : :
579 : : /* ... and wait for it to die. */
580 : : for (;;)
2642 peter_e@gmx.net 581 : 79 : {
582 : : int rc;
583 : :
584 : : /* is it gone? */
2545 585 [ + + + - ]: 139 : if (!worker->proc || worker->generation != generation)
586 : : break;
587 : :
2479 tgl@sss.pgh.pa.us 588 : 79 : LWLockRelease(LogicalRepWorkerLock);
589 : :
590 : : /* Wait a bit --- we don't expect to have to wait long. */
2504 andres@anarazel.de 591 : 79 : rc = WaitLatch(MyLatch,
592 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
593 : : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
594 : :
595 [ + + ]: 79 : if (rc & WL_LATCH_SET)
596 : : {
597 : 22 : ResetLatch(MyLatch);
598 [ + + ]: 22 : CHECK_FOR_INTERRUPTS();
599 : : }
600 : :
2479 tgl@sss.pgh.pa.us 601 : 79 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
602 : : }
603 : : }
604 : :
605 : : /*
606 : : * Stop the logical replication worker for subid/relid, if any.
607 : : */
608 : : void
461 akapila@postgresql.o 609 : 71 : logicalrep_worker_stop(Oid subid, Oid relid)
610 : : {
611 : : LogicalRepWorker *worker;
612 : :
613 : 71 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
614 : :
615 : 71 : worker = logicalrep_worker_find(subid, relid, false);
616 : :
617 [ + + ]: 71 : if (worker)
618 : : {
619 [ + - - + ]: 53 : Assert(!isParallelApplyWorker(worker));
620 : 53 : logicalrep_worker_stop_internal(worker, SIGTERM);
621 : : }
622 : :
623 : 71 : LWLockRelease(LogicalRepWorkerLock);
624 : 71 : }
625 : :
626 : : /*
627 : : * Stop the given logical replication parallel apply worker.
628 : : *
629 : : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
630 : : * worker so that the worker exits cleanly.
631 : : */
632 : : void
341 633 : 5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
634 : : {
635 : : int slot_no;
636 : : uint16 generation;
637 : : LogicalRepWorker *worker;
638 : :
639 [ - + ]: 5 : SpinLockAcquire(&winfo->shared->mutex);
640 : 5 : generation = winfo->shared->logicalrep_worker_generation;
641 : 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
642 : 5 : SpinLockRelease(&winfo->shared->mutex);
643 : :
461 644 [ + - - + ]: 5 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
645 : :
646 : : /*
647 : : * Detach from the error_mq_handle for the parallel apply worker before
648 : : * stopping it. This prevents the leader apply worker from trying to
649 : : * receive the message from the error queue that might already be detached
650 : : * by the parallel apply worker.
651 : : */
341 652 [ + - ]: 5 : if (winfo->error_mq_handle)
653 : : {
654 : 5 : shm_mq_detach(winfo->error_mq_handle);
655 : 5 : winfo->error_mq_handle = NULL;
656 : : }
657 : :
461 658 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
659 : :
660 : 5 : worker = &LogicalRepCtx->workers[slot_no];
661 [ + - - + ]: 5 : Assert(isParallelApplyWorker(worker));
662 : :
663 : : /*
664 : : * Only stop the worker if the generation matches and the worker is alive.
665 : : */
666 [ + - + - ]: 5 : if (worker->generation == generation && worker->proc)
667 : 5 : logicalrep_worker_stop_internal(worker, SIGINT);
668 : :
2479 tgl@sss.pgh.pa.us 669 : 5 : LWLockRelease(LogicalRepWorkerLock);
2642 peter_e@gmx.net 670 : 5 : }
671 : :
672 : : /*
673 : : * Wake up (using latch) any logical replication worker for specified sub/rel.
674 : : */
675 : : void
2579 676 : 191 : logicalrep_worker_wakeup(Oid subid, Oid relid)
677 : : {
678 : : LogicalRepWorker *worker;
679 : :
680 : 191 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
681 : :
682 : 191 : worker = logicalrep_worker_find(subid, relid, true);
683 : :
684 [ + - ]: 191 : if (worker)
685 : 191 : logicalrep_worker_wakeup_ptr(worker);
686 : :
2480 tgl@sss.pgh.pa.us 687 : 191 : LWLockRelease(LogicalRepWorkerLock);
2579 peter_e@gmx.net 688 : 191 : }
689 : :
690 : : /*
691 : : * Wake up (using latch) the specified logical replication worker.
692 : : *
693 : : * Caller must hold lock, else worker->proc could change under us.
694 : : */
695 : : void
696 : 564 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
697 : : {
2480 tgl@sss.pgh.pa.us 698 [ - + ]: 564 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
699 : :
2579 peter_e@gmx.net 700 : 564 : SetLatch(&worker->proc->procLatch);
701 : 564 : }
702 : :
703 : : /*
704 : : * Attach to a slot.
705 : : */
706 : : void
2642 707 : 452 : logicalrep_worker_attach(int slot)
708 : : {
709 : : /* Block concurrent access. */
710 : 452 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
711 : :
712 [ + - - + ]: 452 : Assert(slot >= 0 && slot < max_logical_replication_workers);
713 : 452 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
714 : :
2545 715 [ - + ]: 452 : if (!MyLogicalRepWorker->in_use)
716 : : {
2545 peter_e@gmx.net 717 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
718 [ # # ]: 0 : ereport(ERROR,
719 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
720 : : errmsg("logical replication worker slot %d is empty, cannot attach",
721 : : slot)));
722 : : }
723 : :
2642 peter_e@gmx.net 724 [ - + ]:CBC 452 : if (MyLogicalRepWorker->proc)
725 : : {
2545 peter_e@gmx.net 726 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
2642 727 [ # # ]: 0 : ereport(ERROR,
728 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 : : errmsg("logical replication worker slot %d is already used by "
730 : : "another worker, cannot attach", slot)));
731 : : }
732 : :
2642 peter_e@gmx.net 733 :CBC 452 : MyLogicalRepWorker->proc = MyProc;
734 : 452 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
735 : :
736 : 452 : LWLockRelease(LogicalRepWorkerLock);
737 : 452 : }
738 : :
739 : : /*
740 : : * Stop the parallel apply workers if any, and detach the leader apply worker
741 : : * (cleans up the worker info).
742 : : */
743 : : static void
744 : 442 : logicalrep_worker_detach(void)
745 : : {
746 : : /* Stop the parallel apply workers. */
461 akapila@postgresql.o 747 [ + + ]: 442 : if (am_leader_apply_worker())
748 : : {
749 : : List *workers;
750 : : ListCell *lc;
751 : :
752 : : /*
753 : : * Detach from the error_mq_handle for all parallel apply workers
754 : : * before terminating them. This prevents the leader apply worker from
755 : : * receiving the worker termination message and sending it to logs
756 : : * when the same is already done by the parallel worker.
757 : : */
758 : 263 : pa_detach_all_error_mq();
759 : :
760 : 263 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
761 : :
762 : 263 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
763 [ + - + + : 529 : foreach(lc, workers)
+ + ]
764 : : {
765 : 266 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
766 : :
767 [ + - + + ]: 266 : if (isParallelApplyWorker(w))
768 : 2 : logicalrep_worker_stop_internal(w, SIGTERM);
769 : : }
770 : :
771 : 263 : LWLockRelease(LogicalRepWorkerLock);
772 : : }
773 : :
774 : : /* Block concurrent access. */
2642 peter_e@gmx.net 775 : 442 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
776 : :
2545 777 : 442 : logicalrep_worker_cleanup(MyLogicalRepWorker);
778 : :
2642 779 : 442 : LWLockRelease(LogicalRepWorkerLock);
780 : 442 : }
781 : :
782 : : /*
783 : : * Clean up worker info.
784 : : */
785 : : static void
2545 786 : 446 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
787 : : {
788 [ - + ]: 446 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
789 : :
233 akapila@postgresql.o 790 :GNC 446 : worker->type = WORKERTYPE_UNKNOWN;
2545 peter_e@gmx.net 791 :CBC 446 : worker->in_use = false;
792 : 446 : worker->proc = NULL;
793 : 446 : worker->dbid = InvalidOid;
794 : 446 : worker->userid = InvalidOid;
795 : 446 : worker->subid = InvalidOid;
796 : 446 : worker->relid = InvalidOid;
452 akapila@postgresql.o 797 : 446 : worker->leader_pid = InvalidPid;
461 798 : 446 : worker->parallel_apply = false;
2545 peter_e@gmx.net 799 : 446 : }
800 : :
801 : : /*
802 : : * Cleanup function for logical replication launcher.
803 : : *
804 : : * Called on logical replication launcher exit.
805 : : */
806 : : static void
2552 fujii@postgresql.org 807 : 358 : logicalrep_launcher_onexit(int code, Datum arg)
808 : : {
809 : 358 : LogicalRepCtx->launcher_pid = 0;
810 : 358 : }
811 : :
812 : : /*
813 : : * Cleanup function.
814 : : *
815 : : * Called on logical replication worker exit.
816 : : */
817 : : static void
2642 peter_e@gmx.net 818 : 442 : logicalrep_worker_onexit(int code, Datum arg)
819 : : {
820 : : /* Disconnect gracefully from the remote side. */
1068 alvherre@alvh.no-ip. 821 [ + + ]: 442 : if (LogRepWorkerWalRcvConn)
822 : 335 : walrcv_disconnect(LogRepWorkerWalRcvConn);
823 : :
2642 peter_e@gmx.net 824 : 442 : logicalrep_worker_detach();
825 : :
826 : : /* Cleanup fileset used for streaming transactions. */
955 akapila@postgresql.o 827 [ + + ]: 442 : if (MyLogicalRepWorker->stream_fileset != NULL)
828 : 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
829 : :
830 : : /*
831 : : * Session level locks may be acquired outside of a transaction in
832 : : * parallel apply mode and will not be released when the worker
833 : : * terminates, so manually release all locks before the worker exits.
834 : : *
835 : : * The locks will be acquired once the worker is initialized.
836 : : */
347 837 [ + + ]: 442 : if (!InitializingApplyWorker)
838 : 438 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
839 : :
2509 peter_e@gmx.net 840 : 442 : ApplyLauncherWakeup();
2642 841 : 442 : }
842 : :
843 : : /*
844 : : * Count the number of registered (not necessarily running) sync workers
845 : : * for a subscription.
846 : : */
847 : : int
2579 848 : 1144 : logicalrep_sync_worker_count(Oid subid)
849 : : {
850 : : int i;
2524 bruce@momjian.us 851 : 1144 : int res = 0;
852 : :
2579 peter_e@gmx.net 853 [ - + ]: 1144 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
854 : :
855 : : /* Search for attached worker for a given subscription id. */
856 [ + + ]: 6044 : for (i = 0; i < max_logical_replication_workers; i++)
857 : : {
2524 bruce@momjian.us 858 : 4900 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
859 : :
233 akapila@postgresql.o 860 [ + + + + :GNC 4900 : if (isTablesyncWorker(w) && w->subid == subid)
+ + ]
2579 peter_e@gmx.net 861 :CBC 1109 : res++;
862 : : }
863 : :
864 : 1144 : return res;
865 : : }
866 : :
867 : : /*
868 : : * Count the number of registered (but not necessarily running) parallel apply
869 : : * workers for a subscription.
870 : : */
871 : : static int
461 akapila@postgresql.o 872 : 459 : logicalrep_pa_worker_count(Oid subid)
873 : : {
874 : : int i;
875 : 459 : int res = 0;
876 : :
877 [ - + ]: 459 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
878 : :
879 : : /*
880 : : * Scan all attached parallel apply workers, only counting those which
881 : : * have the given subscription id.
882 : : */
883 [ + + ]: 2537 : for (i = 0; i < max_logical_replication_workers; i++)
884 : : {
885 : 2078 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
886 : :
233 akapila@postgresql.o 887 [ + + + + :GNC 2078 : if (isParallelApplyWorker(w) && w->subid == subid)
+ - ]
461 akapila@postgresql.o 888 :CBC 2 : res++;
889 : : }
890 : :
891 : 459 : return res;
892 : : }
893 : :
894 : : /*
895 : : * ApplyLauncherShmemSize
896 : : * Compute space needed for replication launcher shared memory
897 : : */
898 : : Size
2642 peter_e@gmx.net 899 : 3475 : ApplyLauncherShmemSize(void)
900 : : {
901 : : Size size;
902 : :
903 : : /*
904 : : * Need the fixed struct and the array of LogicalRepWorker.
905 : : */
906 : 3475 : size = sizeof(LogicalRepCtxStruct);
907 : 3475 : size = MAXALIGN(size);
908 : 3475 : size = add_size(size, mul_size(max_logical_replication_workers,
909 : : sizeof(LogicalRepWorker)));
910 : 3475 : return size;
911 : : }
912 : :
913 : : /*
914 : : * ApplyLauncherRegister
915 : : * Register a background worker running the logical replication launcher.
916 : : */
917 : : void
918 : 733 : ApplyLauncherRegister(void)
919 : : {
920 : : BackgroundWorker bgw;
921 : :
922 : : /*
923 : : * The logical replication launcher is disabled during binary upgrades, to
924 : : * prevent logical replication workers from running on the source cluster.
925 : : * That could cause replication origins to move forward after having been
926 : : * copied to the target cluster, potentially creating conflicts with the
927 : : * copied data files.
928 : : */
93 michael@paquier.xyz 929 [ + + + + ]:GNC 733 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
2642 peter_e@gmx.net 930 :GBC 24 : return;
931 : :
2555 tgl@sss.pgh.pa.us 932 :CBC 709 : memset(&bgw, 0, sizeof(bgw));
2524 bruce@momjian.us 933 : 709 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
934 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
2642 peter_e@gmx.net 935 : 709 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
286 nathan@postgresql.or 936 :GNC 709 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
2571 rhaas@postgresql.org 937 :CBC 709 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
2642 peter_e@gmx.net 938 : 709 : snprintf(bgw.bgw_name, BGW_MAXLEN,
939 : : "logical replication launcher");
2418 940 : 709 : snprintf(bgw.bgw_type, BGW_MAXLEN,
941 : : "logical replication launcher");
2642 942 : 709 : bgw.bgw_restart_time = 5;
943 : 709 : bgw.bgw_notify_pid = 0;
944 : 709 : bgw.bgw_main_arg = (Datum) 0;
945 : :
946 : 709 : RegisterBackgroundWorker(&bgw);
947 : : }
948 : :
949 : : /*
950 : : * ApplyLauncherShmemInit
951 : : * Allocate and initialize replication launcher shared memory
952 : : */
953 : : void
954 : 898 : ApplyLauncherShmemInit(void)
955 : : {
956 : : bool found;
957 : :
958 : 898 : LogicalRepCtx = (LogicalRepCtxStruct *)
959 : 898 : ShmemInitStruct("Logical Replication Launcher Data",
960 : : ApplyLauncherShmemSize(),
961 : : &found);
962 : :
963 [ + - ]: 898 : if (!found)
964 : : {
965 : : int slot;
966 : :
967 : 898 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
968 : :
445 tgl@sss.pgh.pa.us 969 : 898 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
970 : 898 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
971 : :
972 : : /* Initialize memory and spin locks for each worker slot. */
2579 peter_e@gmx.net 973 [ + + ]: 4473 : for (slot = 0; slot < max_logical_replication_workers; slot++)
974 : : {
975 : 3575 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
976 : :
977 : 3575 : memset(worker, 0, sizeof(LogicalRepWorker));
978 : 3575 : SpinLockInit(&worker->relmutex);
979 : : }
980 : : }
2642 981 : 898 : }
982 : :
983 : : /*
984 : : * Initialize or attach to the dynamic shared hash table that stores the
985 : : * last-start times, if not already done.
986 : : * This must be called before accessing the table.
987 : : */
988 : : static void
448 tgl@sss.pgh.pa.us 989 : 1041 : logicalrep_launcher_attach_dshmem(void)
990 : : {
991 : : MemoryContext oldcontext;
992 : :
993 : : /* Quick exit if we already did this. */
445 994 [ + + ]: 1041 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
448 995 [ + + ]: 977 : last_start_times != NULL)
996 : 878 : return;
997 : :
998 : : /* Otherwise, use a lock to ensure only one process creates the table. */
999 : 163 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1000 : :
1001 : : /* Be sure any local memory allocated by DSA routines is persistent. */
1002 : 163 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1003 : :
445 1004 [ + + ]: 163 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1005 : : {
1006 : : /* Initialize dynamic shared hash table for last-start times. */
448 1007 : 64 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1008 : 64 : dsa_pin(last_start_times_dsa);
1009 : 64 : dsa_pin_mapping(last_start_times_dsa);
48 nathan@postgresql.or 1010 :GNC 64 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1011 : :
1012 : : /* Store handles in shared memory for other backends to use. */
448 tgl@sss.pgh.pa.us 1013 :CBC 64 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1014 : 64 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1015 : : }
1016 [ + - ]: 99 : else if (!last_start_times)
1017 : : {
1018 : : /* Attach to existing dynamic shared hash table. */
1019 : 99 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1020 : 99 : dsa_pin_mapping(last_start_times_dsa);
1021 : 99 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1022 : 99 : LogicalRepCtx->last_start_dsh, 0);
1023 : : }
1024 : :
1025 : 163 : MemoryContextSwitchTo(oldcontext);
1026 : 163 : LWLockRelease(LogicalRepWorkerLock);
1027 : : }
1028 : :
1029 : : /*
1030 : : * Set the last-start time for the subscription.
1031 : : */
1032 : : static void
1033 : 276 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1034 : : {
1035 : : LauncherLastStartTimesEntry *entry;
1036 : : bool found;
1037 : :
1038 : 276 : logicalrep_launcher_attach_dshmem();
1039 : :
1040 : 276 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1041 : 276 : entry->last_start_time = start_time;
1042 : 276 : dshash_release_lock(last_start_times, entry);
1043 : 276 : }
1044 : :
1045 : : /*
1046 : : * Return the last-start time for the subscription, or 0 if there isn't one.
1047 : : */
1048 : : static TimestampTz
1049 : 637 : ApplyLauncherGetWorkerStartTime(Oid subid)
1050 : : {
1051 : : LauncherLastStartTimesEntry *entry;
1052 : : TimestampTz ret;
1053 : :
1054 : 637 : logicalrep_launcher_attach_dshmem();
1055 : :
1056 : 637 : entry = dshash_find(last_start_times, &subid, false);
1057 [ + + ]: 637 : if (entry == NULL)
1058 : 149 : return 0;
1059 : :
1060 : 488 : ret = entry->last_start_time;
1061 : 488 : dshash_release_lock(last_start_times, entry);
1062 : :
1063 : 488 : return ret;
1064 : : }
1065 : :
1066 : : /*
1067 : : * Remove the last-start-time entry for the subscription, if one exists.
1068 : : *
1069 : : * This has two use-cases: to remove the entry related to a subscription
1070 : : * that's been deleted or disabled (just to avoid leaking shared memory),
1071 : : * and to allow immediate restart of an apply worker that has exited
1072 : : * due to subscription parameter changes.
1073 : : */
1074 : : void
1075 : 128 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1076 : : {
1077 : 128 : logicalrep_launcher_attach_dshmem();
1078 : :
1079 : 128 : (void) dshash_delete_key(last_start_times, &subid);
1080 : 128 : }
1081 : :
1082 : : /*
1083 : : * Wakeup the launcher on commit if requested.
1084 : : */
1085 : : void
2540 peter_e@gmx.net 1086 : 432906 : AtEOXact_ApplyLauncher(bool isCommit)
1087 : : {
2445 1088 [ + + ]: 432906 : if (isCommit)
1089 : : {
1090 [ + + ]: 409700 : if (on_commit_launcher_wakeup)
1091 : 116 : ApplyLauncherWakeup();
1092 : : }
1093 : :
2540 1094 : 432906 : on_commit_launcher_wakeup = false;
2642 1095 : 432906 : }
1096 : :
1097 : : /*
1098 : : * Request wakeup of the launcher on commit of the transaction.
1099 : : *
1100 : : * This is used to send launcher signal to stop sleeping and process the
1101 : : * subscriptions when current transaction commits. Should be used when new
1102 : : * tuple was added to the pg_subscription catalog.
1103 : : */
1104 : : void
1105 : 116 : ApplyLauncherWakeupAtCommit(void)
1106 : : {
2624 heikki.linnakangas@i 1107 [ + - ]: 116 : if (!on_commit_launcher_wakeup)
1108 : 116 : on_commit_launcher_wakeup = true;
2642 peter_e@gmx.net 1109 : 116 : }
1110 : :
1111 : : static void
1112 : 558 : ApplyLauncherWakeup(void)
1113 : : {
2552 fujii@postgresql.org 1114 [ + + ]: 558 : if (LogicalRepCtx->launcher_pid != 0)
2642 peter_e@gmx.net 1115 : 537 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1116 : 558 : }
1117 : :
1118 : : /*
1119 : : * Main loop for the apply launcher process.
1120 : : */
1121 : : void
1122 : 618 : ApplyLauncherMain(Datum main_arg)
1123 : : {
2592 tgl@sss.pgh.pa.us 1124 [ + + ]: 618 : ereport(DEBUG1,
1125 : : (errmsg_internal("logical replication launcher started")));
1126 : :
2552 fujii@postgresql.org 1127 : 618 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1128 : :
2502 andres@anarazel.de 1129 [ - + ]: 618 : Assert(LogicalRepCtx->launcher_pid == 0);
1130 : 618 : LogicalRepCtx->launcher_pid = MyProcPid;
1131 : :
1132 : : /* Establish signal handlers. */
1367 akapila@postgresql.o 1133 : 618 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
2502 andres@anarazel.de 1134 : 618 : pqsignal(SIGTERM, die);
2642 peter_e@gmx.net 1135 : 618 : BackgroundWorkerUnblockSignals();
1136 : :
1137 : : /*
1138 : : * Establish connection to nailed catalogs (we only ever access
1139 : : * pg_subscription).
1140 : : */
2201 magnus@hagander.net 1141 : 618 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1142 : :
1143 : : /* Enter main loop */
1144 : : for (;;)
2642 peter_e@gmx.net 1145 : 2507 : {
1146 : : int rc;
1147 : : List *sublist;
1148 : : ListCell *lc;
1149 : : MemoryContext subctx;
1150 : : MemoryContext oldctx;
2524 bruce@momjian.us 1151 : 3125 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1152 : :
2502 andres@anarazel.de 1153 [ + + ]: 3125 : CHECK_FOR_INTERRUPTS();
1154 : :
1155 : : /* Use temporary context to avoid leaking memory across cycles. */
448 tgl@sss.pgh.pa.us 1156 : 3124 : subctx = AllocSetContextCreate(TopMemoryContext,
1157 : : "Logical Replication Launcher sublist",
1158 : : ALLOCSET_DEFAULT_SIZES);
1159 : 3124 : oldctx = MemoryContextSwitchTo(subctx);
1160 : :
1161 : : /* Start any missing workers for enabled subscriptions. */
1162 : 3124 : sublist = get_subscription_list();
1163 [ + + + + : 4136 : foreach(lc, sublist)
+ + ]
1164 : : {
1165 : 1014 : Subscription *sub = (Subscription *) lfirst(lc);
1166 : : LogicalRepWorker *w;
1167 : : TimestampTz last_start;
1168 : : TimestampTz now;
1169 : : long elapsed;
1170 : :
1171 [ + + ]: 1014 : if (!sub->enabled)
1172 : 31 : continue;
1173 : :
1174 : 983 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1175 : 983 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1176 : 983 : LWLockRelease(LogicalRepWorkerLock);
1177 : :
1178 [ + + ]: 983 : if (w != NULL)
1179 : 346 : continue; /* worker is running already */
1180 : :
1181 : : /*
1182 : : * If the worker is eligible to start now, launch it. Otherwise,
1183 : : * adjust wait_time so that we'll wake up as soon as it can be
1184 : : * started.
1185 : : *
1186 : : * Each subscription's apply worker can only be restarted once per
1187 : : * wal_retrieve_retry_interval, so that errors do not cause us to
1188 : : * repeatedly restart the worker as fast as possible. In cases
1189 : : * where a restart is expected (e.g., subscription parameter
1190 : : * changes), another process should remove the last-start entry
1191 : : * for the subscription so that the worker can be restarted
1192 : : * without waiting for wal_retrieve_retry_interval to elapse.
1193 : : */
1194 : 637 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1195 : 637 : now = GetCurrentTimestamp();
1196 [ + + ]: 637 : if (last_start == 0 ||
1197 [ + + ]: 488 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1198 : : {
1199 : 276 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
244 akapila@postgresql.o 1200 :GNC 276 : logicalrep_worker_launch(WORKERTYPE_APPLY,
1201 : 276 : sub->dbid, sub->oid, sub->name,
1202 : : sub->owner, InvalidOid,
1203 : : DSM_HANDLE_INVALID);
1204 : : }
1205 : : else
1206 : : {
448 tgl@sss.pgh.pa.us 1207 :CBC 361 : wait_time = Min(wait_time,
1208 : : wal_retrieve_retry_interval - elapsed);
1209 : : }
1210 : : }
1211 : :
1212 : : /* Switch back to original memory context. */
1213 : 3122 : MemoryContextSwitchTo(oldctx);
1214 : : /* Clean the temporary memory. */
1215 : 3122 : MemoryContextDelete(subctx);
1216 : :
1217 : : /* Wait for more work. */
2504 andres@anarazel.de 1218 : 3122 : rc = WaitLatch(MyLatch,
1219 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1220 : : wait_time,
1221 : : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1222 : :
1223 [ + + ]: 2861 : if (rc & WL_LATCH_SET)
1224 : : {
1225 : 2774 : ResetLatch(MyLatch);
1226 [ + + ]: 2774 : CHECK_FOR_INTERRUPTS();
1227 : : }
1228 : :
1580 rhaas@postgresql.org 1229 [ + + ]: 2507 : if (ConfigReloadPending)
1230 : : {
1231 : 100 : ConfigReloadPending = false;
2561 peter_e@gmx.net 1232 : 100 : ProcessConfigFile(PGC_SIGHUP);
1233 : : }
1234 : : }
1235 : :
1236 : : /* Not reachable */
1237 : : }
1238 : :
1239 : : /*
1240 : : * Is current process the logical replication launcher?
1241 : : */
1242 : : bool
2502 andres@anarazel.de 1243 : 370 : IsLogicalLauncher(void)
1244 : : {
1245 : 370 : return LogicalRepCtx->launcher_pid == MyProcPid;
1246 : : }
1247 : :
1248 : : /*
1249 : : * Return the pid of the leader apply worker if the given pid is the pid of a
1250 : : * parallel apply worker, otherwise, return InvalidPid.
1251 : : */
1252 : : pid_t
452 akapila@postgresql.o 1253 : 794 : GetLeaderApplyWorkerPid(pid_t pid)
1254 : : {
1255 : 794 : int leader_pid = InvalidPid;
1256 : : int i;
1257 : :
1258 : 794 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1259 : :
1260 [ + + ]: 3970 : for (i = 0; i < max_logical_replication_workers; i++)
1261 : : {
1262 : 3176 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1263 : :
1264 [ + + - + : 3176 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
- - - - ]
1265 : : {
452 akapila@postgresql.o 1266 :UBC 0 : leader_pid = w->leader_pid;
1267 : 0 : break;
1268 : : }
1269 : : }
1270 : :
452 akapila@postgresql.o 1271 :CBC 794 : LWLockRelease(LogicalRepWorkerLock);
1272 : :
1273 : 794 : return leader_pid;
1274 : : }
1275 : :
1276 : : /*
1277 : : * Returns state of the subscriptions.
1278 : : */
1279 : : Datum
2642 peter_e@gmx.net 1280 : 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1281 : : {
1282 : : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1283 [ - + ]: 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1284 : : int i;
1285 : 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1286 : :
544 michael@paquier.xyz 1287 : 1 : InitMaterializedSRF(fcinfo, 0);
1288 : :
1289 : : /* Make sure we get consistent view of the workers. */
2642 peter_e@gmx.net 1290 : 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1291 : :
677 tgl@sss.pgh.pa.us 1292 [ + + ]: 5 : for (i = 0; i < max_logical_replication_workers; i++)
1293 : : {
1294 : : /* for each row */
638 peter@eisentraut.org 1295 : 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1296 : 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1297 : : int worker_pid;
1298 : : LogicalRepWorker worker;
1299 : :
2642 peter_e@gmx.net 1300 : 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1301 : : sizeof(LogicalRepWorker));
1302 [ + + - + ]: 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1303 : 2 : continue;
1304 : :
1305 [ - + - - ]: 2 : if (OidIsValid(subid) && worker.subid != subid)
2642 peter_e@gmx.net 1306 :UBC 0 : continue;
1307 : :
2642 peter_e@gmx.net 1308 :CBC 2 : worker_pid = worker.proc->pid;
1309 : :
1310 : 2 : values[0] = ObjectIdGetDatum(worker.subid);
244 akapila@postgresql.o 1311 [ + - - + ]:GNC 2 : if (isTablesyncWorker(&worker))
2579 peter_e@gmx.net 1312 :UBC 0 : values[1] = ObjectIdGetDatum(worker.relid);
1313 : : else
2579 peter_e@gmx.net 1314 :CBC 2 : nulls[1] = true;
1315 : 2 : values[2] = Int32GetDatum(worker_pid);
1316 : :
452 akapila@postgresql.o 1317 [ + - - + ]: 2 : if (isParallelApplyWorker(&worker))
452 akapila@postgresql.o 1318 :UBC 0 : values[3] = Int32GetDatum(worker.leader_pid);
1319 : : else
2579 peter_e@gmx.net 1320 :CBC 2 : nulls[3] = true;
1321 : :
452 akapila@postgresql.o 1322 [ - + ]: 2 : if (XLogRecPtrIsInvalid(worker.last_lsn))
452 akapila@postgresql.o 1323 :UBC 0 : nulls[4] = true;
1324 : : else
452 akapila@postgresql.o 1325 :CBC 2 : values[4] = LSNGetDatum(worker.last_lsn);
2642 peter_e@gmx.net 1326 [ - + ]: 2 : if (worker.last_send_time == 0)
452 akapila@postgresql.o 1327 :UBC 0 : nulls[5] = true;
1328 : : else
452 akapila@postgresql.o 1329 :CBC 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
2642 peter_e@gmx.net 1330 [ - + ]: 2 : if (worker.last_recv_time == 0)
452 akapila@postgresql.o 1331 :UBC 0 : nulls[6] = true;
1332 : : else
452 akapila@postgresql.o 1333 :CBC 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
2642 peter_e@gmx.net 1334 [ - + ]: 2 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
452 akapila@postgresql.o 1335 :UBC 0 : nulls[7] = true;
1336 : : else
452 akapila@postgresql.o 1337 :CBC 2 : values[7] = LSNGetDatum(worker.reply_lsn);
2642 peter_e@gmx.net 1338 [ - + ]: 2 : if (worker.reply_time == 0)
452 akapila@postgresql.o 1339 :UBC 0 : nulls[8] = true;
1340 : : else
452 akapila@postgresql.o 1341 :CBC 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1342 : :
202 nathan@postgresql.or 1343 [ + - - - :GNC 2 : switch (worker.type)
- ]
1344 : : {
1345 : 2 : case WORKERTYPE_APPLY:
1346 : 2 : values[9] = CStringGetTextDatum("apply");
1347 : 2 : break;
202 nathan@postgresql.or 1348 :UNC 0 : case WORKERTYPE_PARALLEL_APPLY:
1349 : 0 : values[9] = CStringGetTextDatum("parallel apply");
1350 : 0 : break;
1351 : 0 : case WORKERTYPE_TABLESYNC:
1352 : 0 : values[9] = CStringGetTextDatum("table synchronization");
1353 : 0 : break;
1354 : 0 : case WORKERTYPE_UNKNOWN:
1355 : : /* Should never happen. */
1356 [ # # ]: 0 : elog(ERROR, "unknown worker type");
1357 : : }
1358 : :
769 michael@paquier.xyz 1359 :CBC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1360 : : values, nulls);
1361 : :
1362 : : /*
1363 : : * If only a single subscription was requested, and we found it,
1364 : : * break.
1365 : : */
2642 peter_e@gmx.net 1366 [ - + ]: 2 : if (OidIsValid(subid))
2642 peter_e@gmx.net 1367 :UBC 0 : break;
1368 : : }
1369 : :
2642 peter_e@gmx.net 1370 :CBC 1 : LWLockRelease(LogicalRepWorkerLock);
1371 : :
1372 : 1 : return (Datum) 0;
1373 : : }
|