Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "libpq/pqsignal.h"
30 : #include "miscadmin.h"
31 : #include "pgstat.h"
32 : #include "postmaster/bgworker.h"
33 : #include "postmaster/fork_process.h"
34 : #include "postmaster/interrupt.h"
35 : #include "postmaster/postmaster.h"
36 : #include "replication/logicallauncher.h"
37 : #include "replication/logicalworker.h"
38 : #include "replication/slot.h"
39 : #include "replication/walreceiver.h"
40 : #include "replication/worker_internal.h"
41 : #include "storage/ipc.h"
42 : #include "storage/proc.h"
43 : #include "storage/procarray.h"
44 : #include "storage/procsignal.h"
45 : #include "tcop/tcopprot.h"
46 : #include "utils/builtins.h"
47 : #include "utils/memutils.h"
48 : #include "utils/pg_lsn.h"
49 : #include "utils/ps_status.h"
50 : #include "utils/snapmgr.h"
51 : #include "utils/timeout.h"
52 :
53 : /* max sleep time between cycles (3min) */
54 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
55 :
56 : /* GUC variables */
57 : int max_logical_replication_workers = 4;
58 : int max_sync_workers_per_subscription = 2;
59 : int max_parallel_apply_workers_per_subscription = 2;
60 :
61 : LogicalRepWorker *MyLogicalRepWorker = NULL;
62 :
63 : typedef struct LogicalRepCtxStruct
64 : {
65 : /* Supervisor process. */
66 : pid_t launcher_pid;
67 :
68 : /* Hash table holding last start times of subscriptions' apply workers. */
69 : dsa_handle last_start_dsa;
70 : dshash_table_handle last_start_dsh;
71 :
72 : /* Background workers. */
73 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
74 : } LogicalRepCtxStruct;
75 :
76 : static LogicalRepCtxStruct *LogicalRepCtx;
77 :
78 : /* an entry in the last-start-times shared hash table */
79 : typedef struct LauncherLastStartTimesEntry
80 : {
81 : Oid subid; /* OID of logrep subscription (hash key) */
82 : TimestampTz last_start_time; /* last time its apply worker was started */
83 : } LauncherLastStartTimesEntry;
84 :
85 : /* parameters for the last-start-times shared hash table */
86 : static const dshash_parameters dsh_params = {
87 : sizeof(Oid),
88 : sizeof(LauncherLastStartTimesEntry),
89 : dshash_memcmp,
90 : dshash_memhash,
91 : LWTRANCHE_LAUNCHER_HASH
92 : };
93 :
94 : static dsa_area *last_start_times_dsa = NULL;
95 : static dshash_table *last_start_times = NULL;
96 :
97 : static bool on_commit_launcher_wakeup = false;
98 :
99 :
100 : static void ApplyLauncherWakeup(void);
101 : static void logicalrep_launcher_onexit(int code, Datum arg);
102 : static void logicalrep_worker_onexit(int code, Datum arg);
103 : static void logicalrep_worker_detach(void);
104 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
105 : static int logicalrep_pa_worker_count(Oid subid);
106 : static void logicalrep_launcher_attach_dshmem(void);
107 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
108 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
109 :
110 :
111 : /*
112 : * Load the list of subscriptions.
113 : *
114 : * Only the fields interesting for worker start/stop functions are filled for
115 : * each subscription.
116 : */
117 : static List *
2271 peter_e 118 GIC 2036 : get_subscription_list(void)
119 : {
120 2036 : List *res = NIL;
121 : Relation rel;
122 : TableScanDesc scan;
123 : HeapTuple tup;
124 : MemoryContext resultcxt;
125 :
126 : /* This is the context that we will allocate our output data in */
127 2036 : resultcxt = CurrentMemoryContext;
128 :
129 : /*
130 : * Start a transaction so we can access pg_database, and get a snapshot.
131 : * We don't have a use for the snapshot itself, but we're interested in
132 : * the secondary effect that it sets RecentGlobalXmin. (This is critical
133 : * for anything that reads heap pages, because HOT may decide to prune
134 : * them even if the process doesn't attempt to modify any tuples.)
135 : *
136 : * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
137 : * not pushed/active does not reliably prevent HOT pruning (->xmin could
138 : * e.g. be cleared when cache invalidations are processed).
139 : */
140 2036 : StartTransactionCommand();
141 2036 : (void) GetTransactionSnapshot();
142 :
1539 andres 143 2036 : rel = table_open(SubscriptionRelationId, AccessShareLock);
1490 144 2036 : scan = table_beginscan_catalog(rel, 0, NULL);
145 :
2271 peter_e 146 2412 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
147 : {
148 376 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
2153 bruce 149 ECB : Subscription *sub;
150 : MemoryContext oldcxt;
2271 peter_e 151 :
152 : /*
153 : * Allocate our results in the caller's context, not the
154 : * transaction's. We do this inside the loop, and restore the original
155 : * context at the end, so that leaky things like heap_getnext() are
156 : * not called in a potentially long-lived context.
157 : */
2271 peter_e 158 CBC 376 : oldcxt = MemoryContextSwitchTo(resultcxt);
159 :
2186 peter_e 160 GIC 376 : sub = (Subscription *) palloc0(sizeof(Subscription));
1601 andres 161 376 : sub->oid = subform->oid;
2271 peter_e 162 376 : sub->dbid = subform->subdbid;
163 376 : sub->owner = subform->subowner;
164 376 : sub->enabled = subform->subenabled;
165 376 : sub->name = pstrdup(NameStr(subform->subname));
166 : /* We don't fill fields we are not interested in. */
167 :
168 376 : res = lappend(res, sub);
169 376 : MemoryContextSwitchTo(oldcxt);
170 : }
2271 peter_e 171 ECB :
1490 andres 172 CBC 2036 : table_endscan(scan);
1539 andres 173 GIC 2036 : table_close(rel, AccessShareLock);
2271 peter_e 174 ECB :
2271 peter_e 175 CBC 2036 : CommitTransactionCommand();
176 :
177 2036 : return res;
178 : }
2271 peter_e 179 ECB :
180 : /*
181 : * Wait for a background worker to start up and attach to the shmem context.
182 : *
183 : * This is only needed for cleaning up the shared memory in case the worker
184 : * fails to attach.
185 : *
186 : * Returns whether the attach was successful.
187 : */
188 : static bool
2271 peter_e 189 GIC 274 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
190 : uint16 generation,
2271 peter_e 191 ECB : BackgroundWorkerHandle *handle)
192 : {
193 : BgwHandleStatus status;
194 : int rc;
195 :
196 : for (;;)
2271 peter_e 197 CBC 774 : {
2271 peter_e 198 ECB : pid_t pid;
199 :
2271 peter_e 200 GIC 1048 : CHECK_FOR_INTERRUPTS();
2271 peter_e 201 ECB :
2174 peter_e 202 CBC 1048 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
203 :
204 : /* Worker either died or has started. Return false if died. */
205 1048 : if (!worker->in_use || worker->proc)
2174 peter_e 206 ECB : {
2174 peter_e 207 GIC 274 : LWLockRelease(LogicalRepWorkerLock);
90 akapila 208 GNC 274 : return worker->in_use;
209 : }
2174 peter_e 210 ECB :
2174 peter_e 211 GIC 774 : LWLockRelease(LogicalRepWorkerLock);
212 :
213 : /* Check if worker has died before attaching, and clean up after it. */
2271 214 774 : status = GetBackgroundWorkerPid(handle, &pid);
215 :
216 774 : if (status == BGWH_STOPPED)
217 : {
2174 peter_e 218 UIC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
219 : /* Ensure that this was indeed the worker we waited for. */
220 0 : if (generation == worker->generation)
221 0 : logicalrep_worker_cleanup(worker);
2174 peter_e 222 LBC 0 : LWLockRelease(LogicalRepWorkerLock);
90 akapila 223 UNC 0 : return false;
224 : }
225 :
226 : /*
227 : * We need timeout because we generally don't get notified via latch
228 : * about the worker attach. But we don't expect to have to wait long.
229 : */
2271 peter_e 230 CBC 774 : rc = WaitLatch(MyLatch,
231 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
232 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
2271 peter_e 233 ECB :
2133 andres 234 GIC 774 : if (rc & WL_LATCH_SET)
2133 andres 235 ECB : {
2133 andres 236 GIC 296 : ResetLatch(MyLatch);
237 296 : CHECK_FOR_INTERRUPTS();
2133 andres 238 ECB : }
239 : }
2271 peter_e 240 : }
241 :
242 : /*
243 : * Walks the workers array and searches for one that matches given
2208 244 : * subscription id and relid.
245 : *
246 : * We are only interested in the leader apply worker or table sync worker.
247 : */
248 : LogicalRepWorker *
2208 peter_e 249 CBC 2137 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
250 : {
2153 bruce 251 ECB : int i;
2153 bruce 252 GIC 2137 : LogicalRepWorker *res = NULL;
2271 peter_e 253 EUB :
2271 peter_e 254 GIC 2137 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
2208 peter_e 255 EUB :
2271 256 : /* Search for attached worker for a given subscription id. */
2271 peter_e 257 GBC 6794 : for (i = 0; i < max_logical_replication_workers; i++)
2271 peter_e 258 EUB : {
2153 bruce 259 GIC 5922 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
260 :
261 : /* Skip parallel apply workers. */
90 akapila 262 GNC 5922 : if (isParallelApplyWorker(w))
263 1246 : continue;
264 :
2174 peter_e 265 GIC 4676 : if (w->in_use && w->subid == subid && w->relid == relid &&
266 1265 : (!only_running || w->proc))
267 : {
2271 268 1265 : res = w;
2271 peter_e 269 CBC 1265 : break;
270 : }
271 : }
272 :
273 2137 : return res;
274 : }
2271 peter_e 275 ECB :
2074 276 : /*
277 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
278 : * the subscription, instead of just one.
279 : */
280 : List *
2074 peter_e 281 GIC 373 : logicalrep_workers_find(Oid subid, bool only_running)
282 : {
283 : int i;
284 373 : List *res = NIL;
285 :
286 373 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
287 :
2074 peter_e 288 ECB : /* Search for attached worker for a given subscription id. */
2074 peter_e 289 GIC 1957 : for (i = 0; i < max_logical_replication_workers; i++)
290 : {
2074 peter_e 291 CBC 1584 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
292 :
293 1584 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
2074 peter_e 294 GIC 240 : res = lappend(res, w);
295 : }
2074 peter_e 296 ECB :
2074 peter_e 297 GIC 373 : return res;
2074 peter_e 298 ECB : }
299 :
300 : /*
301 : * Start new logical replication background worker, if possible.
302 : *
303 : * Returns true on success, false on failure.
2271 304 : */
305 : bool
2208 peter_e 306 CBC 274 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
307 : Oid relid, dsm_handle subworker_dsm)
308 : {
2153 bruce 309 ECB : BackgroundWorker bgw;
2271 peter_e 310 : BackgroundWorkerHandle *bgw_handle;
311 : uint16 generation;
312 : int i;
2153 bruce 313 GIC 274 : int slot = 0;
2153 bruce 314 CBC 274 : LogicalRepWorker *worker = NULL;
315 : int nsyncworkers;
316 : int nparallelapplyworkers;
317 : TimestampTz now;
90 akapila 318 GNC 274 : bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
319 :
320 : /* Sanity check - tablesync worker cannot be a subworker */
321 274 : Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
322 :
2146 peter_e 323 GIC 274 : ereport(DEBUG1,
324 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
325 : subname)));
326 :
2271 peter_e 327 ECB : /* Report this after the initial starting message for consistency. */
2271 peter_e 328 GIC 274 : if (max_replication_slots == 0)
2271 peter_e 329 UIC 0 : ereport(ERROR,
2271 peter_e 330 ECB : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
331 : errmsg("cannot start logical replication workers when max_replication_slots = 0")));
332 :
333 : /*
334 : * We need to do the modification of the shared memory under lock so that
335 : * we have consistent view.
336 : */
2271 peter_e 337 CBC 274 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
338 :
2174 339 274 : retry:
2271 peter_e 340 ECB : /* Find unused worker slot. */
2174 peter_e 341 GIC 503 : for (i = 0; i < max_logical_replication_workers; i++)
342 : {
2174 peter_e 343 CBC 503 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
344 :
2174 peter_e 345 GIC 503 : if (!w->in_use)
346 : {
347 274 : worker = w;
348 274 : slot = i;
2271 349 274 : break;
350 : }
351 : }
2271 peter_e 352 ECB :
2174 peter_e 353 GIC 274 : nsyncworkers = logicalrep_sync_worker_count(subid);
354 :
355 274 : now = GetCurrentTimestamp();
356 :
357 : /*
358 : * If we didn't find a free slot, try to do garbage collection. The
2174 peter_e 359 ECB : * reason we do this is because if some worker failed to start up and its
360 : * parent has crashed while waiting, the in_use state was never cleared.
361 : */
2174 peter_e 362 GIC 274 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
363 : {
2153 bruce 364 LBC 0 : bool did_cleanup = false;
365 :
2174 peter_e 366 UIC 0 : for (i = 0; i < max_logical_replication_workers; i++)
2174 peter_e 367 ECB : {
2174 peter_e 368 UIC 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
2174 peter_e 369 ECB :
370 : /*
371 : * If the worker was marked in use but didn't manage to attach in
372 : * time, clean it up.
373 : */
2174 peter_e 374 LBC 0 : if (w->in_use && !w->proc &&
2174 peter_e 375 UBC 0 : TimestampDifferenceExceeds(w->launch_time, now,
376 : wal_receiver_timeout))
377 : {
2174 peter_e 378 UIC 0 : elog(WARNING,
379 : "logical replication worker for subscription %u took too long to start; canceled",
380 : w->subid);
381 :
382 0 : logicalrep_worker_cleanup(w);
2174 peter_e 383 LBC 0 : did_cleanup = true;
384 : }
2174 peter_e 385 ECB : }
386 :
2174 peter_e 387 LBC 0 : if (did_cleanup)
2174 peter_e 388 UIC 0 : goto retry;
2174 peter_e 389 ECB : }
390 :
391 : /*
392 : * We don't allow to invoke more sync workers once we have reached the
332 tgl 393 : * sync worker limit per subscription. So, just return silently as we
394 : * might get here because of an otherwise harmless race condition.
2174 peter_e 395 : */
355 akapila 396 GIC 274 : if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
397 : {
2174 peter_e 398 UIC 0 : LWLockRelease(LogicalRepWorkerLock);
90 akapila 399 UNC 0 : return false;
400 : }
401 :
90 akapila 402 GNC 274 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
403 :
404 : /*
405 : * Return false if the number of parallel apply workers reached the limit
406 : * per subscription.
407 : */
408 274 : if (is_parallel_apply_worker &&
409 10 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
410 : {
90 akapila 411 UNC 0 : LWLockRelease(LogicalRepWorkerLock);
412 0 : return false;
413 : }
2174 peter_e 414 ECB :
415 : /*
416 : * However if there are no more free worker slots, inform user about it
417 : * before exiting.
418 : */
2271 peter_e 419 GIC 274 : if (worker == NULL)
420 : {
2266 fujii 421 LBC 0 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 422 UIC 0 : ereport(WARNING,
2271 peter_e 423 EUB : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
424 : errmsg("out of logical replication worker slots"),
425 : errhint("You might need to increase max_logical_replication_workers.")));
90 akapila 426 UNC 0 : return false;
2271 peter_e 427 EUB : }
428 :
429 : /* Prepare the worker slot. */
2174 peter_e 430 GIC 274 : worker->launch_time = now;
431 274 : worker->in_use = true;
432 274 : worker->generation++;
2208 peter_e 433 GBC 274 : worker->proc = NULL;
2271 434 274 : worker->dbid = dbid;
2271 peter_e 435 GIC 274 : worker->userid = userid;
436 274 : worker->subid = subid;
2208 peter_e 437 GBC 274 : worker->relid = relid;
2208 peter_e 438 GIC 274 : worker->relstate = SUBREL_STATE_UNKNOWN;
439 274 : worker->relstate_lsn = InvalidXLogRecPtr;
584 akapila 440 274 : worker->stream_fileset = NULL;
81 akapila 441 GNC 274 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
90 442 274 : worker->parallel_apply = is_parallel_apply_worker;
2208 peter_e 443 GBC 274 : worker->last_lsn = InvalidXLogRecPtr;
444 274 : TIMESTAMP_NOBEGIN(worker->last_send_time);
2208 peter_e 445 GIC 274 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
446 274 : worker->reply_lsn = InvalidXLogRecPtr;
447 274 : TIMESTAMP_NOBEGIN(worker->reply_time);
2271 peter_e 448 EUB :
2029 tgl 449 : /* Before releasing lock, remember generation for future identification. */
2029 tgl 450 GIC 274 : generation = worker->generation;
451 :
2271 peter_e 452 274 : LWLockRelease(LogicalRepWorkerLock);
453 :
454 : /* Register the new dynamic worker. */
2184 tgl 455 274 : memset(&bgw, 0, sizeof(bgw));
2153 bruce 456 274 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
2271 peter_e 457 ECB : BGWORKER_BACKEND_DATABASE_CONNECTION;
2271 peter_e 458 GIC 274 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
2200 rhaas 459 GBC 274 : snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
460 :
90 akapila 461 GNC 274 : if (is_parallel_apply_worker)
462 10 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
463 : else
464 264 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
465 :
2208 peter_e 466 GIC 274 : if (OidIsValid(relid))
467 153 : snprintf(bgw.bgw_name, BGW_MAXLEN,
2208 peter_e 468 ECB : "logical replication worker for subscription %u sync %u", subid, relid);
90 akapila 469 GNC 121 : else if (is_parallel_apply_worker)
470 10 : snprintf(bgw.bgw_name, BGW_MAXLEN,
471 : "logical replication parallel apply worker for subscription %u", subid);
472 : else
2208 peter_e 473 GIC 111 : snprintf(bgw.bgw_name, BGW_MAXLEN,
474 : "logical replication apply worker for subscription %u", subid);
475 :
90 akapila 476 GNC 274 : if (is_parallel_apply_worker)
477 10 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
478 : else
479 264 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
480 :
2271 peter_e 481 CBC 274 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
482 274 : bgw.bgw_notify_pid = MyProcPid;
2181 fujii 483 GIC 274 : bgw.bgw_main_arg = Int32GetDatum(slot);
2271 peter_e 484 EUB :
90 akapila 485 GNC 274 : if (is_parallel_apply_worker)
486 10 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
487 :
2271 peter_e 488 GBC 274 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
489 : {
490 : /* Failed to start worker, so clean up the worker slot. */
2029 tgl 491 UIC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
492 0 : Assert(generation == worker->generation);
493 0 : logicalrep_worker_cleanup(worker);
494 0 : LWLockRelease(LogicalRepWorkerLock);
2029 tgl 495 ECB :
2271 peter_e 496 UIC 0 : ereport(WARNING,
2271 peter_e 497 EUB : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
2170 tgl 498 : errmsg("out of background worker slots"),
499 : errhint("You might need to increase max_worker_processes.")));
90 akapila 500 UNC 0 : return false;
501 : }
2271 peter_e 502 EUB :
503 : /* Now wait until it attaches. */
90 akapila 504 GNC 274 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
505 : }
2271 peter_e 506 ECB :
507 : /*
508 : * Internal function to stop the worker and wait until it detaches from the
509 : * slot.
510 : */
511 : static void
90 akapila 512 GNC 45 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
2271 peter_e 513 ECB : {
2153 bruce 514 : uint16 generation;
2271 peter_e 515 :
90 akapila 516 GNC 45 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
517 :
2174 peter_e 518 ECB : /*
519 : * Remember which generation was our worker so we can check if what we see
520 : * is still the same one.
521 : */
2174 peter_e 522 CBC 45 : generation = worker->generation;
523 :
2271 peter_e 524 ECB : /*
2108 tgl 525 : * If we found a worker but it does not have proc set then it is still
526 : * starting up; wait for it to finish starting and then kill it.
2271 peter_e 527 : */
2174 peter_e 528 CBC 45 : while (worker->in_use && !worker->proc)
529 : {
2153 bruce 530 ECB : int rc;
531 :
2271 peter_e 532 CBC 2 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 533 ECB :
534 : /* Wait a bit --- we don't expect to have to wait long. */
2133 andres 535 CBC 2 : rc = WaitLatch(MyLatch,
1598 tmunro 536 ECB : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
537 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
538 :
2133 andres 539 CBC 2 : if (rc & WL_LATCH_SET)
540 : {
2133 andres 541 UIC 0 : ResetLatch(MyLatch);
2133 andres 542 LBC 0 : CHECK_FOR_INTERRUPTS();
2133 andres 543 ECB : }
544 :
2108 tgl 545 : /* Recheck worker status. */
2271 peter_e 546 GIC 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
2267 peter_e 547 ECB :
548 : /*
2174 549 : * Check whether the worker slot is no longer used, which would mean
550 : * that the worker has exited, or whether the worker generation is
551 : * different, meaning that a different worker has taken the slot.
2267 552 : */
2174 peter_e 553 GIC 2 : if (!worker->in_use || worker->generation != generation)
2267 peter_e 554 UIC 0 : return;
2267 peter_e 555 EUB :
556 : /* Worker has assigned proc, so it has started. */
2267 peter_e 557 GBC 2 : if (worker->proc)
2271 peter_e 558 GIC 2 : break;
2271 peter_e 559 EUB : }
560 :
561 : /* Now terminate the worker ... */
90 akapila 562 GNC 45 : kill(worker->proc->pid, signo);
2271 peter_e 563 EUB :
564 : /* ... and wait for it to die. */
565 : for (;;)
2271 peter_e 566 GIC 55 : {
2153 bruce 567 ECB : int rc;
568 :
569 : /* is it gone? */
2174 peter_e 570 GIC 100 : if (!worker->proc || worker->generation != generation)
571 : break;
572 :
2108 tgl 573 55 : LWLockRelease(LogicalRepWorkerLock);
574 :
2108 tgl 575 ECB : /* Wait a bit --- we don't expect to have to wait long. */
2133 andres 576 GIC 55 : rc = WaitLatch(MyLatch,
577 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
578 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
2271 peter_e 579 ECB :
2133 andres 580 GIC 55 : if (rc & WL_LATCH_SET)
581 : {
582 22 : ResetLatch(MyLatch);
583 22 : CHECK_FOR_INTERRUPTS();
584 : }
2108 tgl 585 ECB :
2108 tgl 586 GIC 55 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
587 : }
588 : }
589 :
590 : /*
591 : * Stop the logical replication worker for subid/relid, if any.
592 : */
593 : void
90 akapila 594 GNC 67 : logicalrep_worker_stop(Oid subid, Oid relid)
595 : {
596 : LogicalRepWorker *worker;
597 :
598 67 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
599 :
600 67 : worker = logicalrep_worker_find(subid, relid, false);
601 :
602 67 : if (worker)
603 : {
604 37 : Assert(!isParallelApplyWorker(worker));
605 37 : logicalrep_worker_stop_internal(worker, SIGTERM);
606 : }
607 :
608 67 : LWLockRelease(LogicalRepWorkerLock);
609 67 : }
610 :
611 : /*
612 : * Stop the logical replication parallel apply worker corresponding to the
613 : * input slot number.
614 : *
615 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
616 : * worker so that the worker exits cleanly.
617 : */
618 : void
619 5 : logicalrep_pa_worker_stop(int slot_no, uint16 generation)
620 : {
621 : LogicalRepWorker *worker;
622 :
623 5 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
624 :
625 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
626 :
627 5 : worker = &LogicalRepCtx->workers[slot_no];
628 5 : Assert(isParallelApplyWorker(worker));
629 :
630 : /*
631 : * Only stop the worker if the generation matches and the worker is alive.
632 : */
633 5 : if (worker->generation == generation && worker->proc)
634 5 : logicalrep_worker_stop_internal(worker, SIGINT);
635 :
2108 tgl 636 GIC 5 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 637 5 : }
2271 peter_e 638 ECB :
639 : /*
640 : * Wake up (using latch) any logical replication worker for specified sub/rel.
641 : */
2208 642 : void
2208 peter_e 643 GIC 175 : logicalrep_worker_wakeup(Oid subid, Oid relid)
644 : {
2153 bruce 645 ECB : LogicalRepWorker *worker;
646 :
2208 peter_e 647 GIC 175 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
648 :
2208 peter_e 649 CBC 175 : worker = logicalrep_worker_find(subid, relid, true);
650 :
2208 peter_e 651 GBC 175 : if (worker)
652 175 : logicalrep_worker_wakeup_ptr(worker);
653 :
2109 tgl 654 GIC 175 : LWLockRelease(LogicalRepWorkerLock);
2208 peter_e 655 175 : }
2208 peter_e 656 ECB :
657 : /*
658 : * Wake up (using latch) the specified logical replication worker.
659 : *
660 : * Caller must hold lock, else worker->proc could change under us.
661 : */
662 : void
2208 peter_e 663 CBC 520 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
2208 peter_e 664 EUB : {
2109 tgl 665 GIC 520 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
666 :
2208 peter_e 667 CBC 520 : SetLatch(&worker->proc->procLatch);
668 520 : }
669 :
670 : /*
671 : * Attach to a slot.
2271 peter_e 672 ECB : */
673 : void
2271 peter_e 674 GIC 315 : logicalrep_worker_attach(int slot)
675 : {
2271 peter_e 676 ECB : /* Block concurrent access. */
2271 peter_e 677 GIC 315 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
678 :
679 315 : Assert(slot >= 0 && slot < max_logical_replication_workers);
2271 peter_e 680 CBC 315 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
681 :
2174 peter_e 682 GIC 315 : if (!MyLogicalRepWorker->in_use)
2174 peter_e 683 ECB : {
2174 peter_e 684 UIC 0 : LWLockRelease(LogicalRepWorkerLock);
685 0 : ereport(ERROR,
2153 bruce 686 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
687 : errmsg("logical replication worker slot %d is empty, cannot attach",
688 : slot)));
689 : }
2174 peter_e 690 :
2271 peter_e 691 GIC 315 : if (MyLogicalRepWorker->proc)
2174 peter_e 692 ECB : {
2174 peter_e 693 LBC 0 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 694 UIC 0 : ereport(ERROR,
695 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2118 tgl 696 ECB : errmsg("logical replication worker slot %d is already used by "
697 : "another worker, cannot attach", slot)));
698 : }
699 :
2271 peter_e 700 GIC 315 : MyLogicalRepWorker->proc = MyProc;
701 315 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
702 :
703 315 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 704 CBC 315 : }
705 :
706 : /*
707 : * Stop the parallel apply workers if any, and detach the leader apply worker
708 : * (cleans up the worker info).
2271 peter_e 709 ECB : */
710 : static void
2271 peter_e 711 CBC 315 : logicalrep_worker_detach(void)
712 : {
713 : /* Stop the parallel apply workers. */
90 akapila 714 GNC 315 : if (am_leader_apply_worker())
715 : {
716 : List *workers;
717 : ListCell *lc;
718 :
719 : /*
720 : * Detach from the error_mq_handle for all parallel apply workers
721 : * before terminating them. This prevents the leader apply worker from
722 : * receiving the worker termination message and sending it to logs
723 : * when the same is already done by the parallel worker.
724 : */
725 148 : pa_detach_all_error_mq();
726 :
727 148 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
728 :
729 148 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
730 300 : foreach(lc, workers)
731 : {
732 152 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
733 :
734 152 : if (isParallelApplyWorker(w))
735 3 : logicalrep_worker_stop_internal(w, SIGTERM);
736 : }
737 :
738 148 : LWLockRelease(LogicalRepWorkerLock);
739 : }
740 :
2271 peter_e 741 ECB : /* Block concurrent access. */
2271 peter_e 742 GIC 315 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
2271 peter_e 743 ECB :
2174 peter_e 744 CBC 315 : logicalrep_worker_cleanup(MyLogicalRepWorker);
745 :
2271 peter_e 746 GIC 315 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 747 CBC 315 : }
2271 peter_e 748 ECB :
749 : /*
750 : * Clean up worker info.
751 : */
752 : static void
2174 peter_e 753 GIC 315 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
754 : {
755 315 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
756 :
757 315 : worker->in_use = false;
2174 peter_e 758 CBC 315 : worker->proc = NULL;
2174 peter_e 759 GIC 315 : worker->dbid = InvalidOid;
760 315 : worker->userid = InvalidOid;
761 315 : worker->subid = InvalidOid;
2174 peter_e 762 CBC 315 : worker->relid = InvalidOid;
81 akapila 763 GNC 315 : worker->leader_pid = InvalidPid;
90 764 315 : worker->parallel_apply = false;
2174 peter_e 765 GIC 315 : }
2174 peter_e 766 ECB :
767 : /*
2181 fujii 768 : * Cleanup function for logical replication launcher.
769 : *
770 : * Called on logical replication launcher exit.
771 : */
772 : static void
2181 fujii 773 GIC 322 : logicalrep_launcher_onexit(int code, Datum arg)
2181 fujii 774 ECB : {
2181 fujii 775 CBC 322 : LogicalRepCtx->launcher_pid = 0;
2181 fujii 776 GIC 322 : }
2181 fujii 777 ECB :
2271 peter_e 778 : /*
779 : * Cleanup function.
780 : *
781 : * Called on logical replication worker exit.
782 : */
783 : static void
2271 peter_e 784 CBC 315 : logicalrep_worker_onexit(int code, Datum arg)
785 : {
786 : /* Disconnect gracefully from the remote side. */
697 alvherre 787 GIC 315 : if (LogRepWorkerWalRcvConn)
697 alvherre 788 CBC 289 : walrcv_disconnect(LogRepWorkerWalRcvConn);
789 :
2271 peter_e 790 315 : logicalrep_worker_detach();
791 :
584 akapila 792 ECB : /* Cleanup fileset used for streaming transactions. */
584 akapila 793 CBC 315 : if (MyLogicalRepWorker->stream_fileset != NULL)
584 akapila 794 GIC 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
587 akapila 795 ECB :
796 : /*
797 : * Session level locks may be acquired outside of a transaction in
798 : * parallel apply mode and will not be released when the worker
799 : * terminates, so manually release all locks before the worker exits.
800 : */
90 akapila 801 GNC 315 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
802 :
2138 peter_e 803 CBC 315 : ApplyLauncherWakeup();
2271 peter_e 804 GIC 315 : }
805 :
806 : /*
807 : * Count the number of registered (not necessarily running) sync workers
808 : * for a subscription.
809 : */
810 : int
2208 peter_e 811 CBC 977 : logicalrep_sync_worker_count(Oid subid)
812 : {
2153 bruce 813 ECB : int i;
2153 bruce 814 GIC 977 : int res = 0;
2208 peter_e 815 ECB :
2208 peter_e 816 CBC 977 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
817 :
818 : /* Search for attached worker for a given subscription id. */
2208 peter_e 819 GIC 5049 : for (i = 0; i < max_logical_replication_workers; i++)
820 : {
2153 bruce 821 4072 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
2153 bruce 822 ECB :
2208 peter_e 823 GIC 4072 : if (w->subid == subid && OidIsValid(w->relid))
824 1208 : res++;
2208 peter_e 825 ECB : }
826 :
2208 peter_e 827 CBC 977 : return res;
2208 peter_e 828 ECB : }
829 :
830 : /*
831 : * Count the number of registered (but not necessarily running) parallel apply
832 : * workers for a subscription.
833 : */
834 : static int
90 akapila 835 GNC 274 : logicalrep_pa_worker_count(Oid subid)
836 : {
837 : int i;
838 274 : int res = 0;
839 :
840 274 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
841 :
842 : /*
843 : * Scan all attached parallel apply workers, only counting those which
844 : * have the given subscription id.
845 : */
846 1458 : for (i = 0; i < max_logical_replication_workers; i++)
847 : {
848 1184 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
849 :
850 1184 : if (w->subid == subid && isParallelApplyWorker(w))
851 2 : res++;
852 : }
853 :
854 274 : return res;
855 : }
856 :
2271 peter_e 857 ECB : /*
858 : * ApplyLauncherShmemSize
2271 peter_e 859 EUB : * Compute space needed for replication launcher shared memory
860 : */
861 : Size
2271 peter_e 862 GIC 6390 : ApplyLauncherShmemSize(void)
863 : {
864 : Size size;
865 :
2271 peter_e 866 ECB : /*
867 : * Need the fixed struct and the array of LogicalRepWorker.
2271 peter_e 868 EUB : */
2271 peter_e 869 GBC 6390 : size = sizeof(LogicalRepCtxStruct);
2271 peter_e 870 GIC 6390 : size = MAXALIGN(size);
871 6390 : size = add_size(size, mul_size(max_logical_replication_workers,
872 : sizeof(LogicalRepWorker)));
873 6390 : return size;
874 : }
2271 peter_e 875 ECB :
2184 tgl 876 : /*
877 : * ApplyLauncherRegister
878 : * Register a background worker running the logical replication launcher.
879 : */
880 : void
2271 peter_e 881 GIC 598 : ApplyLauncherRegister(void)
882 : {
883 : BackgroundWorker bgw;
884 :
885 598 : if (max_logical_replication_workers == 0)
2271 peter_e 886 LBC 0 : return;
887 :
2184 tgl 888 GIC 598 : memset(&bgw, 0, sizeof(bgw));
2153 bruce 889 CBC 598 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
890 : BGWORKER_BACKEND_DATABASE_CONNECTION;
2271 peter_e 891 GIC 598 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
2200 rhaas 892 598 : snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
893 598 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
2271 peter_e 894 598 : snprintf(bgw.bgw_name, BGW_MAXLEN,
895 : "logical replication launcher");
2047 896 598 : snprintf(bgw.bgw_type, BGW_MAXLEN,
897 : "logical replication launcher");
2271 898 598 : bgw.bgw_restart_time = 5;
899 598 : bgw.bgw_notify_pid = 0;
2271 peter_e 900 CBC 598 : bgw.bgw_main_arg = (Datum) 0;
901 :
902 598 : RegisterBackgroundWorker(&bgw);
903 : }
2271 peter_e 904 ECB :
905 : /*
906 : * ApplyLauncherShmemInit
907 : * Allocate and initialize replication launcher shared memory
908 : */
909 : void
2271 peter_e 910 CBC 1826 : ApplyLauncherShmemInit(void)
911 : {
912 : bool found;
2271 peter_e 913 ECB :
2271 peter_e 914 GIC 1826 : LogicalRepCtx = (LogicalRepCtxStruct *)
915 1826 : ShmemInitStruct("Logical Replication Launcher Data",
916 : ApplyLauncherShmemSize(),
2271 peter_e 917 ECB : &found);
918 :
2271 peter_e 919 CBC 1826 : if (!found)
920 : {
2153 bruce 921 ECB : int slot;
2208 peter_e 922 :
2271 peter_e 923 GIC 1826 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
924 :
74 tgl 925 GNC 1826 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
926 1826 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
927 :
928 : /* Initialize memory and spin locks for each worker slot. */
2208 peter_e 929 GIC 9134 : for (slot = 0; slot < max_logical_replication_workers; slot++)
930 : {
2208 peter_e 931 CBC 7308 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
932 :
933 7308 : memset(worker, 0, sizeof(LogicalRepWorker));
2208 peter_e 934 GIC 7308 : SpinLockInit(&worker->relmutex);
2208 peter_e 935 ECB : }
936 : }
2271 peter_e 937 CBC 1826 : }
2271 peter_e 938 ECB :
939 : /*
940 : * Initialize or attach to the dynamic shared hash table that stores the
941 : * last-start times, if not already done.
942 : * This must be called before accessing the table.
943 : */
944 : static void
77 tgl 945 GNC 358 : logicalrep_launcher_attach_dshmem(void)
946 : {
947 : MemoryContext oldcontext;
948 :
949 : /* Quick exit if we already did this. */
74 950 358 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
77 951 320 : last_start_times != NULL)
952 241 : return;
953 :
954 : /* Otherwise, use a lock to ensure only one process creates the table. */
955 117 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
956 :
957 : /* Be sure any local memory allocated by DSA routines is persistent. */
958 117 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
959 :
74 960 117 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
961 : {
962 : /* Initialize dynamic shared hash table for last-start times. */
77 963 38 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
964 38 : dsa_pin(last_start_times_dsa);
965 38 : dsa_pin_mapping(last_start_times_dsa);
966 38 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
967 :
968 : /* Store handles in shared memory for other backends to use. */
969 38 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
970 38 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
971 : }
972 79 : else if (!last_start_times)
973 : {
974 : /* Attach to existing dynamic shared hash table. */
975 79 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
976 79 : dsa_pin_mapping(last_start_times_dsa);
977 79 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
978 79 : LogicalRepCtx->last_start_dsh, 0);
979 : }
980 :
981 117 : MemoryContextSwitchTo(oldcontext);
982 117 : LWLockRelease(LogicalRepWorkerLock);
983 : }
984 :
985 : /*
986 : * Set the last-start time for the subscription.
987 : */
988 : static void
989 111 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
990 : {
991 : LauncherLastStartTimesEntry *entry;
992 : bool found;
993 :
994 111 : logicalrep_launcher_attach_dshmem();
995 :
996 111 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
997 111 : entry->last_start_time = start_time;
998 111 : dshash_release_lock(last_start_times, entry);
999 111 : }
1000 :
1001 : /*
1002 : * Return the last-start time for the subscription, or 0 if there isn't one.
1003 : */
1004 : static TimestampTz
1005 139 : ApplyLauncherGetWorkerStartTime(Oid subid)
1006 : {
1007 : LauncherLastStartTimesEntry *entry;
1008 : TimestampTz ret;
1009 :
1010 139 : logicalrep_launcher_attach_dshmem();
1011 :
1012 139 : entry = dshash_find(last_start_times, &subid, false);
1013 139 : if (entry == NULL)
1014 93 : return 0;
1015 :
1016 46 : ret = entry->last_start_time;
1017 46 : dshash_release_lock(last_start_times, entry);
1018 :
1019 46 : return ret;
1020 : }
1021 :
1022 : /*
1023 : * Remove the last-start-time entry for the subscription, if one exists.
1024 : *
1025 : * This has two use-cases: to remove the entry related to a subscription
1026 : * that's been deleted or disabled (just to avoid leaking shared memory),
1027 : * and to allow immediate restart of an apply worker that has exited
1028 : * due to subscription parameter changes.
1029 : */
1030 : void
1031 108 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1032 : {
1033 108 : logicalrep_launcher_attach_dshmem();
1034 :
1035 108 : (void) dshash_delete_key(last_start_times, &subid);
1036 108 : }
1037 :
2271 peter_e 1038 ECB : /*
1039 : * Wakeup the launcher on commit if requested.
1040 : */
1041 : void
2169 peter_e 1042 CBC 485915 : AtEOXact_ApplyLauncher(bool isCommit)
1043 : {
2074 peter_e 1044 GIC 485915 : if (isCommit)
1045 : {
1046 465429 : if (on_commit_launcher_wakeup)
1047 83 : ApplyLauncherWakeup();
1048 : }
1049 :
2169 peter_e 1050 CBC 485915 : on_commit_launcher_wakeup = false;
2271 peter_e 1051 GIC 485915 : }
2271 peter_e 1052 ECB :
1053 : /*
1054 : * Request wakeup of the launcher on commit of the transaction.
1055 : *
1056 : * This is used to send launcher signal to stop sleeping and process the
1057 : * subscriptions when current transaction commits. Should be used when new
1058 : * tuple was added to the pg_subscription catalog.
1059 : */
1060 : void
2271 peter_e 1061 CBC 83 : ApplyLauncherWakeupAtCommit(void)
1062 : {
2253 heikki.linnakangas 1063 GIC 83 : if (!on_commit_launcher_wakeup)
2253 heikki.linnakangas 1064 CBC 83 : on_commit_launcher_wakeup = true;
2271 peter_e 1065 83 : }
1066 :
2181 fujii 1067 ECB : static void
2271 peter_e 1068 GIC 398 : ApplyLauncherWakeup(void)
1069 : {
2181 fujii 1070 CBC 398 : if (LogicalRepCtx->launcher_pid != 0)
2271 peter_e 1071 381 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
2271 peter_e 1072 GIC 398 : }
1073 :
1074 : /*
1075 : * Main loop for the apply launcher process.
1076 : */
1077 : void
2271 peter_e 1078 CBC 322 : ApplyLauncherMain(Datum main_arg)
1079 : {
2221 tgl 1080 GIC 322 : ereport(DEBUG1,
1081 : (errmsg_internal("logical replication launcher started")));
1082 :
2181 fujii 1083 322 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1084 :
2131 andres 1085 322 : Assert(LogicalRepCtx->launcher_pid == 0);
2131 andres 1086 CBC 322 : LogicalRepCtx->launcher_pid = MyProcPid;
1087 :
1088 : /* Establish signal handlers. */
996 akapila 1089 322 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
2131 andres 1090 GIC 322 : pqsignal(SIGTERM, die);
2271 peter_e 1091 CBC 322 : BackgroundWorkerUnblockSignals();
1092 :
1093 : /*
2271 peter_e 1094 ECB : * Establish connection to nailed catalogs (we only ever access
1095 : * pg_subscription).
1096 : */
1830 magnus 1097 GIC 322 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
2271 peter_e 1098 ECB :
1099 : /* Enter main loop */
1100 : for (;;)
2271 peter_e 1101 GIC 1715 : {
2271 peter_e 1102 ECB : int rc;
1103 : List *sublist;
1104 : ListCell *lc;
1105 : MemoryContext subctx;
1106 : MemoryContext oldctx;
2153 bruce 1107 GIC 2037 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1108 :
2131 andres 1109 CBC 2037 : CHECK_FOR_INTERRUPTS();
1110 :
1111 : /* Use temporary context to avoid leaking memory across cycles. */
77 tgl 1112 GNC 2036 : subctx = AllocSetContextCreate(TopMemoryContext,
1113 : "Logical Replication Launcher sublist",
1114 : ALLOCSET_DEFAULT_SIZES);
1115 2036 : oldctx = MemoryContextSwitchTo(subctx);
2271 peter_e 1116 ECB :
1117 : /* Start any missing workers for enabled subscriptions. */
77 tgl 1118 GNC 2036 : sublist = get_subscription_list();
1119 2412 : foreach(lc, sublist)
1120 : {
1121 376 : Subscription *sub = (Subscription *) lfirst(lc);
1122 : LogicalRepWorker *w;
1123 : TimestampTz last_start;
1124 : TimestampTz now;
1125 : long elapsed;
2271 peter_e 1126 ECB :
77 tgl 1127 GNC 376 : if (!sub->enabled)
1128 22 : continue;
2063 peter_e 1129 ECB :
77 tgl 1130 GNC 354 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1131 354 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1132 354 : LWLockRelease(LogicalRepWorkerLock);
1133 :
1134 354 : if (w != NULL)
1135 215 : continue; /* worker is running already */
1136 :
1137 : /*
1138 : * If the worker is eligible to start now, launch it. Otherwise,
1139 : * adjust wait_time so that we'll wake up as soon as it can be
1140 : * started.
1141 : *
1142 : * Each subscription's apply worker can only be restarted once per
1143 : * wal_retrieve_retry_interval, so that errors do not cause us to
1144 : * repeatedly restart the worker as fast as possible. In cases
1145 : * where a restart is expected (e.g., subscription parameter
1146 : * changes), another process should remove the last-start entry
1147 : * for the subscription so that the worker can be restarted
1148 : * without waiting for wal_retrieve_retry_interval to elapse.
1149 : */
1150 139 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1151 139 : now = GetCurrentTimestamp();
1152 139 : if (last_start == 0 ||
1153 46 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1154 : {
1155 111 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1156 111 : logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
1157 : sub->owner, InvalidOid,
1158 : DSM_HANDLE_INVALID);
1159 : }
1160 : else
1161 : {
1162 28 : wait_time = Min(wait_time,
1163 : wal_retrieve_retry_interval - elapsed);
1164 : }
2271 peter_e 1165 ECB : }
1166 :
1167 : /* Switch back to original memory context. */
77 tgl 1168 GNC 2036 : MemoryContextSwitchTo(oldctx);
1169 : /* Clean the temporary memory. */
1170 2036 : MemoryContextDelete(subctx);
1171 :
2271 peter_e 1172 ECB : /* Wait for more work. */
2133 andres 1173 CBC 2036 : rc = WaitLatch(MyLatch,
1598 tmunro 1174 ECB : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
2271 peter_e 1175 : wait_time,
1176 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1177 :
2133 andres 1178 GIC 2033 : if (rc & WL_LATCH_SET)
2133 andres 1179 ECB : {
2133 andres 1180 CBC 2023 : ResetLatch(MyLatch);
1181 2023 : CHECK_FOR_INTERRUPTS();
1182 : }
2133 andres 1183 ECB :
1209 rhaas 1184 GIC 1715 : if (ConfigReloadPending)
1185 : {
1186 25 : ConfigReloadPending = false;
2190 peter_e 1187 25 : ProcessConfigFile(PGC_SIGHUP);
1188 : }
1189 : }
1190 :
2131 andres 1191 ECB : /* Not reachable */
1192 : }
1193 :
1194 : /*
1195 : * Is current process the logical replication launcher?
1196 : */
1197 : bool
2131 andres 1198 GIC 336 : IsLogicalLauncher(void)
1199 : {
2131 andres 1200 CBC 336 : return LogicalRepCtx->launcher_pid == MyProcPid;
1201 : }
1202 :
1203 : /*
1204 : * Return the pid of the leader apply worker if the given pid is the pid of a
1205 : * parallel apply worker, otherwise, return InvalidPid.
1206 : */
1207 : pid_t
81 akapila 1208 GNC 580 : GetLeaderApplyWorkerPid(pid_t pid)
1209 : {
1210 580 : int leader_pid = InvalidPid;
1211 : int i;
1212 :
1213 580 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1214 :
1215 2900 : for (i = 0; i < max_logical_replication_workers; i++)
1216 : {
1217 2320 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1218 :
1219 2320 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1220 : {
81 akapila 1221 UNC 0 : leader_pid = w->leader_pid;
1222 0 : break;
1223 : }
1224 : }
1225 :
81 akapila 1226 GNC 580 : LWLockRelease(LogicalRepWorkerLock);
1227 :
1228 580 : return leader_pid;
1229 : }
1230 :
1231 : /*
2271 peter_e 1232 ECB : * Returns state of the subscriptions.
1233 : */
1234 : Datum
2271 peter_e 1235 CBC 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1236 : {
1237 : #define PG_STAT_GET_SUBSCRIPTION_COLS 9
1238 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1239 : int i;
1240 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1241 :
173 michael 1242 1 : InitMaterializedSRF(fcinfo, 0);
2271 peter_e 1243 ECB :
1244 : /* Make sure we get consistent view of the workers. */
2271 peter_e 1245 GIC 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
2271 peter_e 1246 ECB :
306 tgl 1247 GIC 5 : for (i = 0; i < max_logical_replication_workers; i++)
1248 : {
1249 : /* for each row */
267 peter 1250 GNC 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1251 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1252 : int worker_pid;
1253 : LogicalRepWorker worker;
2271 peter_e 1254 ECB :
2271 peter_e 1255 GIC 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1256 : sizeof(LogicalRepWorker));
1257 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1258 2 : continue;
2271 peter_e 1259 ECB :
2271 peter_e 1260 CBC 2 : if (OidIsValid(subid) && worker.subid != subid)
2271 peter_e 1261 LBC 0 : continue;
1262 :
2271 peter_e 1263 GIC 2 : worker_pid = worker.proc->pid;
2271 peter_e 1264 ECB :
2271 peter_e 1265 GIC 2 : values[0] = ObjectIdGetDatum(worker.subid);
2208 peter_e 1266 CBC 2 : if (OidIsValid(worker.relid))
2208 peter_e 1267 UIC 0 : values[1] = ObjectIdGetDatum(worker.relid);
1268 : else
2208 peter_e 1269 CBC 2 : nulls[1] = true;
1270 2 : values[2] = Int32GetDatum(worker_pid);
1271 :
81 akapila 1272 GNC 2 : if (isParallelApplyWorker(&worker))
81 akapila 1273 UNC 0 : values[3] = Int32GetDatum(worker.leader_pid);
1274 : else
2208 peter_e 1275 GNC 2 : nulls[3] = true;
1276 :
81 akapila 1277 2 : if (XLogRecPtrIsInvalid(worker.last_lsn))
81 akapila 1278 CBC 1 : nulls[4] = true;
1279 : else
81 akapila 1280 GNC 1 : values[4] = LSNGetDatum(worker.last_lsn);
2271 peter_e 1281 2 : if (worker.last_send_time == 0)
81 akapila 1282 UIC 0 : nulls[5] = true;
2271 peter_e 1283 ECB : else
81 akapila 1284 GNC 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
2271 peter_e 1285 2 : if (worker.last_recv_time == 0)
81 akapila 1286 LBC 0 : nulls[6] = true;
1287 : else
81 akapila 1288 GNC 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
2271 peter_e 1289 2 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
81 akapila 1290 CBC 1 : nulls[7] = true;
1291 : else
81 akapila 1292 GNC 1 : values[7] = LSNGetDatum(worker.reply_lsn);
2271 peter_e 1293 2 : if (worker.reply_time == 0)
81 akapila 1294 UNC 0 : nulls[8] = true;
1295 : else
81 akapila 1296 GNC 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1297 :
398 michael 1298 GIC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1299 : values, nulls);
1300 :
2153 bruce 1301 ECB : /*
1302 : * If only a single subscription was requested, and we found it,
1303 : * break.
1304 : */
2271 peter_e 1305 GIC 2 : if (OidIsValid(subid))
2271 peter_e 1306 LBC 0 : break;
1307 : }
2271 peter_e 1308 ECB :
2271 peter_e 1309 CBC 1 : LWLockRelease(LogicalRepWorkerLock);
2271 peter_e 1310 ECB :
2271 peter_e 1311 CBC 1 : return (Datum) 0;
1312 : }
|