LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 89.7 % 445 399 10 12 23 1 12 164 142 81 30 266 3 40
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 31 31 22 9 29 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  * launcher.c
       3                 :  *     PostgreSQL logical replication worker launcher process
       4                 :  *
       5                 :  * Copyright (c) 2016-2023, PostgreSQL Global Development Group
       6                 :  *
       7                 :  * IDENTIFICATION
       8                 :  *    src/backend/replication/logical/launcher.c
       9                 :  *
      10                 :  * NOTES
      11                 :  *    This module contains the logical replication worker launcher which
      12                 :  *    uses the background worker infrastructure to start the logical
      13                 :  *    replication workers for every enabled subscription.
      14                 :  *
      15                 :  *-------------------------------------------------------------------------
      16                 :  */
      17                 : 
      18                 : #include "postgres.h"
      19                 : 
      20                 : #include "access/heapam.h"
      21                 : #include "access/htup.h"
      22                 : #include "access/htup_details.h"
      23                 : #include "access/tableam.h"
      24                 : #include "access/xact.h"
      25                 : #include "catalog/pg_subscription.h"
      26                 : #include "catalog/pg_subscription_rel.h"
      27                 : #include "funcapi.h"
      28                 : #include "lib/dshash.h"
      29                 : #include "libpq/pqsignal.h"
      30                 : #include "miscadmin.h"
      31                 : #include "pgstat.h"
      32                 : #include "postmaster/bgworker.h"
      33                 : #include "postmaster/fork_process.h"
      34                 : #include "postmaster/interrupt.h"
      35                 : #include "postmaster/postmaster.h"
      36                 : #include "replication/logicallauncher.h"
      37                 : #include "replication/logicalworker.h"
      38                 : #include "replication/slot.h"
      39                 : #include "replication/walreceiver.h"
      40                 : #include "replication/worker_internal.h"
      41                 : #include "storage/ipc.h"
      42                 : #include "storage/proc.h"
      43                 : #include "storage/procarray.h"
      44                 : #include "storage/procsignal.h"
      45                 : #include "tcop/tcopprot.h"
      46                 : #include "utils/builtins.h"
      47                 : #include "utils/memutils.h"
      48                 : #include "utils/pg_lsn.h"
      49                 : #include "utils/ps_status.h"
      50                 : #include "utils/snapmgr.h"
      51                 : #include "utils/timeout.h"
      52                 : 
      53                 : /* max sleep time between cycles (3min) */
      54                 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      55                 : 
      56                 : /* GUC variables */
      57                 : int         max_logical_replication_workers = 4;
      58                 : int         max_sync_workers_per_subscription = 2;
      59                 : int         max_parallel_apply_workers_per_subscription = 2;
      60                 : 
      61                 : LogicalRepWorker *MyLogicalRepWorker = NULL;
      62                 : 
      63                 : typedef struct LogicalRepCtxStruct
      64                 : {
      65                 :     /* Supervisor process. */
      66                 :     pid_t       launcher_pid;
      67                 : 
      68                 :     /* Hash table holding last start times of subscriptions' apply workers. */
      69                 :     dsa_handle  last_start_dsa;
      70                 :     dshash_table_handle last_start_dsh;
      71                 : 
      72                 :     /* Background workers. */
      73                 :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      74                 : } LogicalRepCtxStruct;
      75                 : 
      76                 : static LogicalRepCtxStruct *LogicalRepCtx;
      77                 : 
      78                 : /* an entry in the last-start-times shared hash table */
      79                 : typedef struct LauncherLastStartTimesEntry
      80                 : {
      81                 :     Oid         subid;          /* OID of logrep subscription (hash key) */
      82                 :     TimestampTz last_start_time;    /* last time its apply worker was started */
      83                 : } LauncherLastStartTimesEntry;
      84                 : 
      85                 : /* parameters for the last-start-times shared hash table */
      86                 : static const dshash_parameters dsh_params = {
      87                 :     sizeof(Oid),
      88                 :     sizeof(LauncherLastStartTimesEntry),
      89                 :     dshash_memcmp,
      90                 :     dshash_memhash,
      91                 :     LWTRANCHE_LAUNCHER_HASH
      92                 : };
      93                 : 
      94                 : static dsa_area *last_start_times_dsa = NULL;
      95                 : static dshash_table *last_start_times = NULL;
      96                 : 
      97                 : static bool on_commit_launcher_wakeup = false;
      98                 : 
      99                 : 
     100                 : static void ApplyLauncherWakeup(void);
     101                 : static void logicalrep_launcher_onexit(int code, Datum arg);
     102                 : static void logicalrep_worker_onexit(int code, Datum arg);
     103                 : static void logicalrep_worker_detach(void);
     104                 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     105                 : static int  logicalrep_pa_worker_count(Oid subid);
     106                 : static void logicalrep_launcher_attach_dshmem(void);
     107                 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     108                 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     109                 : 
     110                 : 
     111                 : /*
     112                 :  * Load the list of subscriptions.
     113                 :  *
     114                 :  * Only the fields interesting for worker start/stop functions are filled for
     115                 :  * each subscription.
     116                 :  */
     117                 : static List *
     118 GIC        2036 : get_subscription_list(void)
     119                 : {
     120            2036 :     List       *res = NIL;
     121                 :     Relation    rel;
     122                 :     TableScanDesc scan;
     123                 :     HeapTuple   tup;
     124                 :     MemoryContext resultcxt;
     125                 : 
     126                 :     /* This is the context that we will allocate our output data in */
     127            2036 :     resultcxt = CurrentMemoryContext;
     128                 : 
     129                 :     /*
     130                 :      * Start a transaction so we can access pg_database, and get a snapshot.
     131                 :      * We don't have a use for the snapshot itself, but we're interested in
     132                 :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     133                 :      * for anything that reads heap pages, because HOT may decide to prune
     134                 :      * them even if the process doesn't attempt to modify any tuples.)
     135                 :      *
     136                 :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
     137                 :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
     138                 :      * e.g. be cleared when cache invalidations are processed).
     139                 :      */
     140            2036 :     StartTransactionCommand();
     141            2036 :     (void) GetTransactionSnapshot();
     142                 : 
     143            2036 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     144            2036 :     scan = table_beginscan_catalog(rel, 0, NULL);
     145                 : 
     146            2412 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     147                 :     {
     148             376 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     149 ECB             :         Subscription *sub;
     150                 :         MemoryContext oldcxt;
     151                 : 
     152                 :         /*
     153                 :          * Allocate our results in the caller's context, not the
     154                 :          * transaction's. We do this inside the loop, and restore the original
     155                 :          * context at the end, so that leaky things like heap_getnext() are
     156                 :          * not called in a potentially long-lived context.
     157                 :          */
     158 CBC         376 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     159                 : 
     160 GIC         376 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     161             376 :         sub->oid = subform->oid;
     162             376 :         sub->dbid = subform->subdbid;
     163             376 :         sub->owner = subform->subowner;
     164             376 :         sub->enabled = subform->subenabled;
     165             376 :         sub->name = pstrdup(NameStr(subform->subname));
     166                 :         /* We don't fill fields we are not interested in. */
     167                 : 
     168             376 :         res = lappend(res, sub);
     169             376 :         MemoryContextSwitchTo(oldcxt);
     170                 :     }
     171 ECB             : 
     172 CBC        2036 :     table_endscan(scan);
     173 GIC        2036 :     table_close(rel, AccessShareLock);
     174 ECB             : 
     175 CBC        2036 :     CommitTransactionCommand();
     176                 : 
     177            2036 :     return res;
     178                 : }
     179 ECB             : 
     180                 : /*
     181                 :  * Wait for a background worker to start up and attach to the shmem context.
     182                 :  *
     183                 :  * This is only needed for cleaning up the shared memory in case the worker
     184                 :  * fails to attach.
     185                 :  *
     186                 :  * Returns whether the attach was successful.
     187                 :  */
     188                 : static bool
     189 GIC         274 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     190                 :                                uint16 generation,
     191 ECB             :                                BackgroundWorkerHandle *handle)
     192                 : {
     193                 :     BgwHandleStatus status;
     194                 :     int         rc;
     195                 : 
     196                 :     for (;;)
     197 CBC         774 :     {
     198 ECB             :         pid_t       pid;
     199                 : 
     200 GIC        1048 :         CHECK_FOR_INTERRUPTS();
     201 ECB             : 
     202 CBC        1048 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     203                 : 
     204                 :         /* Worker either died or has started. Return false if died. */
     205            1048 :         if (!worker->in_use || worker->proc)
     206 ECB             :         {
     207 GIC         274 :             LWLockRelease(LogicalRepWorkerLock);
     208 GNC         274 :             return worker->in_use;
     209                 :         }
     210 ECB             : 
     211 GIC         774 :         LWLockRelease(LogicalRepWorkerLock);
     212                 : 
     213                 :         /* Check if worker has died before attaching, and clean up after it. */
     214             774 :         status = GetBackgroundWorkerPid(handle, &pid);
     215                 : 
     216             774 :         if (status == BGWH_STOPPED)
     217                 :         {
     218 UIC           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     219                 :             /* Ensure that this was indeed the worker we waited for. */
     220               0 :             if (generation == worker->generation)
     221               0 :                 logicalrep_worker_cleanup(worker);
     222 LBC           0 :             LWLockRelease(LogicalRepWorkerLock);
     223 UNC           0 :             return false;
     224                 :         }
     225                 : 
     226                 :         /*
     227                 :          * We need timeout because we generally don't get notified via latch
     228                 :          * about the worker attach.  But we don't expect to have to wait long.
     229                 :          */
     230 CBC         774 :         rc = WaitLatch(MyLatch,
     231                 :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     232                 :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     233 ECB             : 
     234 GIC         774 :         if (rc & WL_LATCH_SET)
     235 ECB             :         {
     236 GIC         296 :             ResetLatch(MyLatch);
     237             296 :             CHECK_FOR_INTERRUPTS();
     238 ECB             :         }
     239                 :     }
     240                 : }
     241                 : 
     242                 : /*
     243                 :  * Walks the workers array and searches for one that matches given
     244                 :  * subscription id and relid.
     245                 :  *
     246                 :  * We are only interested in the leader apply worker or table sync worker.
     247                 :  */
     248                 : LogicalRepWorker *
     249 CBC        2137 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     250                 : {
     251 ECB             :     int         i;
     252 GIC        2137 :     LogicalRepWorker *res = NULL;
     253 EUB             : 
     254 GIC        2137 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     255 EUB             : 
     256                 :     /* Search for attached worker for a given subscription id. */
     257 GBC        6794 :     for (i = 0; i < max_logical_replication_workers; i++)
     258 EUB             :     {
     259 GIC        5922 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     260                 : 
     261                 :         /* Skip parallel apply workers. */
     262 GNC        5922 :         if (isParallelApplyWorker(w))
     263            1246 :             continue;
     264                 : 
     265 GIC        4676 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     266            1265 :             (!only_running || w->proc))
     267                 :         {
     268            1265 :             res = w;
     269 CBC        1265 :             break;
     270                 :         }
     271                 :     }
     272                 : 
     273            2137 :     return res;
     274                 : }
     275 ECB             : 
     276                 : /*
     277                 :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     278                 :  * the subscription, instead of just one.
     279                 :  */
     280                 : List *
     281 GIC         373 : logicalrep_workers_find(Oid subid, bool only_running)
     282                 : {
     283                 :     int         i;
     284             373 :     List       *res = NIL;
     285                 : 
     286             373 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     287                 : 
     288 ECB             :     /* Search for attached worker for a given subscription id. */
     289 GIC        1957 :     for (i = 0; i < max_logical_replication_workers; i++)
     290                 :     {
     291 CBC        1584 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     292                 : 
     293            1584 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     294 GIC         240 :             res = lappend(res, w);
     295                 :     }
     296 ECB             : 
     297 GIC         373 :     return res;
     298 ECB             : }
     299                 : 
     300                 : /*
     301                 :  * Start new logical replication background worker, if possible.
     302                 :  *
     303                 :  * Returns true on success, false on failure.
     304                 :  */
     305                 : bool
     306 CBC         274 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
     307                 :                          Oid relid, dsm_handle subworker_dsm)
     308                 : {
     309 ECB             :     BackgroundWorker bgw;
     310                 :     BackgroundWorkerHandle *bgw_handle;
     311                 :     uint16      generation;
     312                 :     int         i;
     313 GIC         274 :     int         slot = 0;
     314 CBC         274 :     LogicalRepWorker *worker = NULL;
     315                 :     int         nsyncworkers;
     316                 :     int         nparallelapplyworkers;
     317                 :     TimestampTz now;
     318 GNC         274 :     bool        is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
     319                 : 
     320                 :     /* Sanity check - tablesync worker cannot be a subworker */
     321             274 :     Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
     322                 : 
     323 GIC         274 :     ereport(DEBUG1,
     324                 :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     325                 :                              subname)));
     326                 : 
     327 ECB             :     /* Report this after the initial starting message for consistency. */
     328 GIC         274 :     if (max_replication_slots == 0)
     329 UIC           0 :         ereport(ERROR,
     330 ECB             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     331                 :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
     332                 : 
     333                 :     /*
     334                 :      * We need to do the modification of the shared memory under lock so that
     335                 :      * we have consistent view.
     336                 :      */
     337 CBC         274 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     338                 : 
     339             274 : retry:
     340 ECB             :     /* Find unused worker slot. */
     341 GIC         503 :     for (i = 0; i < max_logical_replication_workers; i++)
     342                 :     {
     343 CBC         503 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     344                 : 
     345 GIC         503 :         if (!w->in_use)
     346                 :         {
     347             274 :             worker = w;
     348             274 :             slot = i;
     349             274 :             break;
     350                 :         }
     351                 :     }
     352 ECB             : 
     353 GIC         274 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     354                 : 
     355             274 :     now = GetCurrentTimestamp();
     356                 : 
     357                 :     /*
     358                 :      * If we didn't find a free slot, try to do garbage collection.  The
     359 ECB             :      * reason we do this is because if some worker failed to start up and its
     360                 :      * parent has crashed while waiting, the in_use state was never cleared.
     361                 :      */
     362 GIC         274 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     363                 :     {
     364 LBC           0 :         bool        did_cleanup = false;
     365                 : 
     366 UIC           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     367 ECB             :         {
     368 UIC           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     369 ECB             : 
     370                 :             /*
     371                 :              * If the worker was marked in use but didn't manage to attach in
     372                 :              * time, clean it up.
     373                 :              */
     374 LBC           0 :             if (w->in_use && !w->proc &&
     375 UBC           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     376                 :                                            wal_receiver_timeout))
     377                 :             {
     378 UIC           0 :                 elog(WARNING,
     379                 :                      "logical replication worker for subscription %u took too long to start; canceled",
     380                 :                      w->subid);
     381                 : 
     382               0 :                 logicalrep_worker_cleanup(w);
     383 LBC           0 :                 did_cleanup = true;
     384                 :             }
     385 ECB             :         }
     386                 : 
     387 LBC           0 :         if (did_cleanup)
     388 UIC           0 :             goto retry;
     389 ECB             :     }
     390                 : 
     391                 :     /*
     392                 :      * We don't allow to invoke more sync workers once we have reached the
     393                 :      * sync worker limit per subscription. So, just return silently as we
     394                 :      * might get here because of an otherwise harmless race condition.
     395                 :      */
     396 GIC         274 :     if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
     397                 :     {
     398 UIC           0 :         LWLockRelease(LogicalRepWorkerLock);
     399 UNC           0 :         return false;
     400                 :     }
     401                 : 
     402 GNC         274 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     403                 : 
     404                 :     /*
     405                 :      * Return false if the number of parallel apply workers reached the limit
     406                 :      * per subscription.
     407                 :      */
     408             274 :     if (is_parallel_apply_worker &&
     409              10 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     410                 :     {
     411 UNC           0 :         LWLockRelease(LogicalRepWorkerLock);
     412               0 :         return false;
     413                 :     }
     414 ECB             : 
     415                 :     /*
     416                 :      * However if there are no more free worker slots, inform user about it
     417                 :      * before exiting.
     418                 :      */
     419 GIC         274 :     if (worker == NULL)
     420                 :     {
     421 LBC           0 :         LWLockRelease(LogicalRepWorkerLock);
     422 UIC           0 :         ereport(WARNING,
     423 EUB             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     424                 :                  errmsg("out of logical replication worker slots"),
     425                 :                  errhint("You might need to increase max_logical_replication_workers.")));
     426 UNC           0 :         return false;
     427 EUB             :     }
     428                 : 
     429                 :     /* Prepare the worker slot. */
     430 GIC         274 :     worker->launch_time = now;
     431             274 :     worker->in_use = true;
     432             274 :     worker->generation++;
     433 GBC         274 :     worker->proc = NULL;
     434             274 :     worker->dbid = dbid;
     435 GIC         274 :     worker->userid = userid;
     436             274 :     worker->subid = subid;
     437 GBC         274 :     worker->relid = relid;
     438 GIC         274 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     439             274 :     worker->relstate_lsn = InvalidXLogRecPtr;
     440             274 :     worker->stream_fileset = NULL;
     441 GNC         274 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     442             274 :     worker->parallel_apply = is_parallel_apply_worker;
     443 GBC         274 :     worker->last_lsn = InvalidXLogRecPtr;
     444             274 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     445 GIC         274 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     446             274 :     worker->reply_lsn = InvalidXLogRecPtr;
     447             274 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     448 EUB             : 
     449                 :     /* Before releasing lock, remember generation for future identification. */
     450 GIC         274 :     generation = worker->generation;
     451                 : 
     452             274 :     LWLockRelease(LogicalRepWorkerLock);
     453                 : 
     454                 :     /* Register the new dynamic worker. */
     455             274 :     memset(&bgw, 0, sizeof(bgw));
     456             274 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     457 ECB             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     458 GIC         274 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     459 GBC         274 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     460                 : 
     461 GNC         274 :     if (is_parallel_apply_worker)
     462              10 :         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     463                 :     else
     464             264 :         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     465                 : 
     466 GIC         274 :     if (OidIsValid(relid))
     467             153 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     468 ECB             :                  "logical replication worker for subscription %u sync %u", subid, relid);
     469 GNC         121 :     else if (is_parallel_apply_worker)
     470              10 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     471                 :                  "logical replication parallel apply worker for subscription %u", subid);
     472                 :     else
     473 GIC         111 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     474                 :                  "logical replication apply worker for subscription %u", subid);
     475                 : 
     476 GNC         274 :     if (is_parallel_apply_worker)
     477              10 :         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     478                 :     else
     479             264 :         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
     480                 : 
     481 CBC         274 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     482             274 :     bgw.bgw_notify_pid = MyProcPid;
     483 GIC         274 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     484 EUB             : 
     485 GNC         274 :     if (is_parallel_apply_worker)
     486              10 :         memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     487                 : 
     488 GBC         274 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     489                 :     {
     490                 :         /* Failed to start worker, so clean up the worker slot. */
     491 UIC           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     492               0 :         Assert(generation == worker->generation);
     493               0 :         logicalrep_worker_cleanup(worker);
     494               0 :         LWLockRelease(LogicalRepWorkerLock);
     495 ECB             : 
     496 UIC           0 :         ereport(WARNING,
     497 EUB             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     498                 :                  errmsg("out of background worker slots"),
     499                 :                  errhint("You might need to increase max_worker_processes.")));
     500 UNC           0 :         return false;
     501                 :     }
     502 EUB             : 
     503                 :     /* Now wait until it attaches. */
     504 GNC         274 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     505                 : }
     506 ECB             : 
     507                 : /*
     508                 :  * Internal function to stop the worker and wait until it detaches from the
     509                 :  * slot.
     510                 :  */
     511                 : static void
     512 GNC          45 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     513 ECB             : {
     514                 :     uint16      generation;
     515                 : 
     516 GNC          45 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     517                 : 
     518 ECB             :     /*
     519                 :      * Remember which generation was our worker so we can check if what we see
     520                 :      * is still the same one.
     521                 :      */
     522 CBC          45 :     generation = worker->generation;
     523                 : 
     524 ECB             :     /*
     525                 :      * If we found a worker but it does not have proc set then it is still
     526                 :      * starting up; wait for it to finish starting and then kill it.
     527                 :      */
     528 CBC          45 :     while (worker->in_use && !worker->proc)
     529                 :     {
     530 ECB             :         int         rc;
     531                 : 
     532 CBC           2 :         LWLockRelease(LogicalRepWorkerLock);
     533 ECB             : 
     534                 :         /* Wait a bit --- we don't expect to have to wait long. */
     535 CBC           2 :         rc = WaitLatch(MyLatch,
     536 ECB             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     537                 :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     538                 : 
     539 CBC           2 :         if (rc & WL_LATCH_SET)
     540                 :         {
     541 UIC           0 :             ResetLatch(MyLatch);
     542 LBC           0 :             CHECK_FOR_INTERRUPTS();
     543 ECB             :         }
     544                 : 
     545                 :         /* Recheck worker status. */
     546 GIC           2 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     547 ECB             : 
     548                 :         /*
     549                 :          * Check whether the worker slot is no longer used, which would mean
     550                 :          * that the worker has exited, or whether the worker generation is
     551                 :          * different, meaning that a different worker has taken the slot.
     552                 :          */
     553 GIC           2 :         if (!worker->in_use || worker->generation != generation)
     554 UIC           0 :             return;
     555 EUB             : 
     556                 :         /* Worker has assigned proc, so it has started. */
     557 GBC           2 :         if (worker->proc)
     558 GIC           2 :             break;
     559 EUB             :     }
     560                 : 
     561                 :     /* Now terminate the worker ... */
     562 GNC          45 :     kill(worker->proc->pid, signo);
     563 EUB             : 
     564                 :     /* ... and wait for it to die. */
     565                 :     for (;;)
     566 GIC          55 :     {
     567 ECB             :         int         rc;
     568                 : 
     569                 :         /* is it gone? */
     570 GIC         100 :         if (!worker->proc || worker->generation != generation)
     571                 :             break;
     572                 : 
     573              55 :         LWLockRelease(LogicalRepWorkerLock);
     574                 : 
     575 ECB             :         /* Wait a bit --- we don't expect to have to wait long. */
     576 GIC          55 :         rc = WaitLatch(MyLatch,
     577                 :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     578                 :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     579 ECB             : 
     580 GIC          55 :         if (rc & WL_LATCH_SET)
     581                 :         {
     582              22 :             ResetLatch(MyLatch);
     583              22 :             CHECK_FOR_INTERRUPTS();
     584                 :         }
     585 ECB             : 
     586 GIC          55 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     587                 :     }
     588                 : }
     589                 : 
     590                 : /*
     591                 :  * Stop the logical replication worker for subid/relid, if any.
     592                 :  */
     593                 : void
     594 GNC          67 : logicalrep_worker_stop(Oid subid, Oid relid)
     595                 : {
     596                 :     LogicalRepWorker *worker;
     597                 : 
     598              67 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     599                 : 
     600              67 :     worker = logicalrep_worker_find(subid, relid, false);
     601                 : 
     602              67 :     if (worker)
     603                 :     {
     604              37 :         Assert(!isParallelApplyWorker(worker));
     605              37 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     606                 :     }
     607                 : 
     608              67 :     LWLockRelease(LogicalRepWorkerLock);
     609              67 : }
     610                 : 
     611                 : /*
     612                 :  * Stop the logical replication parallel apply worker corresponding to the
     613                 :  * input slot number.
     614                 :  *
     615                 :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     616                 :  * worker so that the worker exits cleanly.
     617                 :  */
     618                 : void
     619               5 : logicalrep_pa_worker_stop(int slot_no, uint16 generation)
     620                 : {
     621                 :     LogicalRepWorker *worker;
     622                 : 
     623               5 :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     624                 : 
     625               5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     626                 : 
     627               5 :     worker = &LogicalRepCtx->workers[slot_no];
     628               5 :     Assert(isParallelApplyWorker(worker));
     629                 : 
     630                 :     /*
     631                 :      * Only stop the worker if the generation matches and the worker is alive.
     632                 :      */
     633               5 :     if (worker->generation == generation && worker->proc)
     634               5 :         logicalrep_worker_stop_internal(worker, SIGINT);
     635                 : 
     636 GIC           5 :     LWLockRelease(LogicalRepWorkerLock);
     637               5 : }
     638 ECB             : 
     639                 : /*
     640                 :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     641                 :  */
     642                 : void
     643 GIC         175 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     644                 : {
     645 ECB             :     LogicalRepWorker *worker;
     646                 : 
     647 GIC         175 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     648                 : 
     649 CBC         175 :     worker = logicalrep_worker_find(subid, relid, true);
     650                 : 
     651 GBC         175 :     if (worker)
     652             175 :         logicalrep_worker_wakeup_ptr(worker);
     653                 : 
     654 GIC         175 :     LWLockRelease(LogicalRepWorkerLock);
     655             175 : }
     656 ECB             : 
     657                 : /*
     658                 :  * Wake up (using latch) the specified logical replication worker.
     659                 :  *
     660                 :  * Caller must hold lock, else worker->proc could change under us.
     661                 :  */
     662                 : void
     663 CBC         520 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     664 EUB             : {
     665 GIC         520 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     666                 : 
     667 CBC         520 :     SetLatch(&worker->proc->procLatch);
     668             520 : }
     669                 : 
     670                 : /*
     671                 :  * Attach to a slot.
     672 ECB             :  */
     673                 : void
     674 GIC         315 : logicalrep_worker_attach(int slot)
     675                 : {
     676 ECB             :     /* Block concurrent access. */
     677 GIC         315 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     678                 : 
     679             315 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     680 CBC         315 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     681                 : 
     682 GIC         315 :     if (!MyLogicalRepWorker->in_use)
     683 ECB             :     {
     684 UIC           0 :         LWLockRelease(LogicalRepWorkerLock);
     685               0 :         ereport(ERROR,
     686 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     687                 :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     688                 :                         slot)));
     689                 :     }
     690                 : 
     691 GIC         315 :     if (MyLogicalRepWorker->proc)
     692 ECB             :     {
     693 LBC           0 :         LWLockRelease(LogicalRepWorkerLock);
     694 UIC           0 :         ereport(ERROR,
     695                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     696 ECB             :                  errmsg("logical replication worker slot %d is already used by "
     697                 :                         "another worker, cannot attach", slot)));
     698                 :     }
     699                 : 
     700 GIC         315 :     MyLogicalRepWorker->proc = MyProc;
     701             315 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     702                 : 
     703             315 :     LWLockRelease(LogicalRepWorkerLock);
     704 CBC         315 : }
     705                 : 
     706                 : /*
     707                 :  * Stop the parallel apply workers if any, and detach the leader apply worker
     708                 :  * (cleans up the worker info).
     709 ECB             :  */
     710                 : static void
     711 CBC         315 : logicalrep_worker_detach(void)
     712                 : {
     713                 :     /* Stop the parallel apply workers. */
     714 GNC         315 :     if (am_leader_apply_worker())
     715                 :     {
     716                 :         List       *workers;
     717                 :         ListCell   *lc;
     718                 : 
     719                 :         /*
     720                 :          * Detach from the error_mq_handle for all parallel apply workers
     721                 :          * before terminating them. This prevents the leader apply worker from
     722                 :          * receiving the worker termination message and sending it to logs
     723                 :          * when the same is already done by the parallel worker.
     724                 :          */
     725             148 :         pa_detach_all_error_mq();
     726                 : 
     727             148 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     728                 : 
     729             148 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
     730             300 :         foreach(lc, workers)
     731                 :         {
     732             152 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     733                 : 
     734             152 :             if (isParallelApplyWorker(w))
     735               3 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     736                 :         }
     737                 : 
     738             148 :         LWLockRelease(LogicalRepWorkerLock);
     739                 :     }
     740                 : 
     741 ECB             :     /* Block concurrent access. */
     742 GIC         315 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     743 ECB             : 
     744 CBC         315 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     745                 : 
     746 GIC         315 :     LWLockRelease(LogicalRepWorkerLock);
     747 CBC         315 : }
     748 ECB             : 
     749                 : /*
     750                 :  * Clean up worker info.
     751                 :  */
     752                 : static void
     753 GIC         315 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     754                 : {
     755             315 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     756                 : 
     757             315 :     worker->in_use = false;
     758 CBC         315 :     worker->proc = NULL;
     759 GIC         315 :     worker->dbid = InvalidOid;
     760             315 :     worker->userid = InvalidOid;
     761             315 :     worker->subid = InvalidOid;
     762 CBC         315 :     worker->relid = InvalidOid;
     763 GNC         315 :     worker->leader_pid = InvalidPid;
     764             315 :     worker->parallel_apply = false;
     765 GIC         315 : }
     766 ECB             : 
     767                 : /*
     768                 :  * Cleanup function for logical replication launcher.
     769                 :  *
     770                 :  * Called on logical replication launcher exit.
     771                 :  */
     772                 : static void
     773 GIC         322 : logicalrep_launcher_onexit(int code, Datum arg)
     774 ECB             : {
     775 CBC         322 :     LogicalRepCtx->launcher_pid = 0;
     776 GIC         322 : }
     777 ECB             : 
     778                 : /*
     779                 :  * Cleanup function.
     780                 :  *
     781                 :  * Called on logical replication worker exit.
     782                 :  */
     783                 : static void
     784 CBC         315 : logicalrep_worker_onexit(int code, Datum arg)
     785                 : {
     786                 :     /* Disconnect gracefully from the remote side. */
     787 GIC         315 :     if (LogRepWorkerWalRcvConn)
     788 CBC         289 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     789                 : 
     790             315 :     logicalrep_worker_detach();
     791                 : 
     792 ECB             :     /* Cleanup fileset used for streaming transactions. */
     793 CBC         315 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     794 GIC          14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     795 ECB             : 
     796                 :     /*
     797                 :      * Session level locks may be acquired outside of a transaction in
     798                 :      * parallel apply mode and will not be released when the worker
     799                 :      * terminates, so manually release all locks before the worker exits.
     800                 :      */
     801 GNC         315 :     LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     802                 : 
     803 CBC         315 :     ApplyLauncherWakeup();
     804 GIC         315 : }
     805                 : 
     806                 : /*
     807                 :  * Count the number of registered (not necessarily running) sync workers
     808                 :  * for a subscription.
     809                 :  */
     810                 : int
     811 CBC         977 : logicalrep_sync_worker_count(Oid subid)
     812                 : {
     813 ECB             :     int         i;
     814 GIC         977 :     int         res = 0;
     815 ECB             : 
     816 CBC         977 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     817                 : 
     818                 :     /* Search for attached worker for a given subscription id. */
     819 GIC        5049 :     for (i = 0; i < max_logical_replication_workers; i++)
     820                 :     {
     821            4072 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     822 ECB             : 
     823 GIC        4072 :         if (w->subid == subid && OidIsValid(w->relid))
     824            1208 :             res++;
     825 ECB             :     }
     826                 : 
     827 CBC         977 :     return res;
     828 ECB             : }
     829                 : 
     830                 : /*
     831                 :  * Count the number of registered (but not necessarily running) parallel apply
     832                 :  * workers for a subscription.
     833                 :  */
     834                 : static int
     835 GNC         274 : logicalrep_pa_worker_count(Oid subid)
     836                 : {
     837                 :     int         i;
     838             274 :     int         res = 0;
     839                 : 
     840             274 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     841                 : 
     842                 :     /*
     843                 :      * Scan all attached parallel apply workers, only counting those which
     844                 :      * have the given subscription id.
     845                 :      */
     846            1458 :     for (i = 0; i < max_logical_replication_workers; i++)
     847                 :     {
     848            1184 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     849                 : 
     850            1184 :         if (w->subid == subid && isParallelApplyWorker(w))
     851               2 :             res++;
     852                 :     }
     853                 : 
     854             274 :     return res;
     855                 : }
     856                 : 
     857 ECB             : /*
     858                 :  * ApplyLauncherShmemSize
     859 EUB             :  *      Compute space needed for replication launcher shared memory
     860                 :  */
     861                 : Size
     862 GIC        6390 : ApplyLauncherShmemSize(void)
     863                 : {
     864                 :     Size        size;
     865                 : 
     866 ECB             :     /*
     867                 :      * Need the fixed struct and the array of LogicalRepWorker.
     868 EUB             :      */
     869 GBC        6390 :     size = sizeof(LogicalRepCtxStruct);
     870 GIC        6390 :     size = MAXALIGN(size);
     871            6390 :     size = add_size(size, mul_size(max_logical_replication_workers,
     872                 :                                    sizeof(LogicalRepWorker)));
     873            6390 :     return size;
     874                 : }
     875 ECB             : 
     876                 : /*
     877                 :  * ApplyLauncherRegister
     878                 :  *      Register a background worker running the logical replication launcher.
     879                 :  */
     880                 : void
     881 GIC         598 : ApplyLauncherRegister(void)
     882                 : {
     883                 :     BackgroundWorker bgw;
     884                 : 
     885             598 :     if (max_logical_replication_workers == 0)
     886 LBC           0 :         return;
     887                 : 
     888 GIC         598 :     memset(&bgw, 0, sizeof(bgw));
     889 CBC         598 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     890                 :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     891 GIC         598 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     892             598 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     893             598 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     894             598 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     895                 :              "logical replication launcher");
     896             598 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     897                 :              "logical replication launcher");
     898             598 :     bgw.bgw_restart_time = 5;
     899             598 :     bgw.bgw_notify_pid = 0;
     900 CBC         598 :     bgw.bgw_main_arg = (Datum) 0;
     901                 : 
     902             598 :     RegisterBackgroundWorker(&bgw);
     903                 : }
     904 ECB             : 
     905                 : /*
     906                 :  * ApplyLauncherShmemInit
     907                 :  *      Allocate and initialize replication launcher shared memory
     908                 :  */
     909                 : void
     910 CBC        1826 : ApplyLauncherShmemInit(void)
     911                 : {
     912                 :     bool        found;
     913 ECB             : 
     914 GIC        1826 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     915            1826 :         ShmemInitStruct("Logical Replication Launcher Data",
     916                 :                         ApplyLauncherShmemSize(),
     917 ECB             :                         &found);
     918                 : 
     919 CBC        1826 :     if (!found)
     920                 :     {
     921 ECB             :         int         slot;
     922                 : 
     923 GIC        1826 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     924                 : 
     925 GNC        1826 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     926            1826 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     927                 : 
     928                 :         /* Initialize memory and spin locks for each worker slot. */
     929 GIC        9134 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     930                 :         {
     931 CBC        7308 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     932                 : 
     933            7308 :             memset(worker, 0, sizeof(LogicalRepWorker));
     934 GIC        7308 :             SpinLockInit(&worker->relmutex);
     935 ECB             :         }
     936                 :     }
     937 CBC        1826 : }
     938 ECB             : 
     939                 : /*
     940                 :  * Initialize or attach to the dynamic shared hash table that stores the
     941                 :  * last-start times, if not already done.
     942                 :  * This must be called before accessing the table.
     943                 :  */
     944                 : static void
     945 GNC         358 : logicalrep_launcher_attach_dshmem(void)
     946                 : {
     947                 :     MemoryContext oldcontext;
     948                 : 
     949                 :     /* Quick exit if we already did this. */
     950             358 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
     951             320 :         last_start_times != NULL)
     952             241 :         return;
     953                 : 
     954                 :     /* Otherwise, use a lock to ensure only one process creates the table. */
     955             117 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     956                 : 
     957                 :     /* Be sure any local memory allocated by DSA routines is persistent. */
     958             117 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
     959                 : 
     960             117 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
     961                 :     {
     962                 :         /* Initialize dynamic shared hash table for last-start times. */
     963              38 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
     964              38 :         dsa_pin(last_start_times_dsa);
     965              38 :         dsa_pin_mapping(last_start_times_dsa);
     966              38 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
     967                 : 
     968                 :         /* Store handles in shared memory for other backends to use. */
     969              38 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
     970              38 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
     971                 :     }
     972              79 :     else if (!last_start_times)
     973                 :     {
     974                 :         /* Attach to existing dynamic shared hash table. */
     975              79 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
     976              79 :         dsa_pin_mapping(last_start_times_dsa);
     977              79 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
     978              79 :                                          LogicalRepCtx->last_start_dsh, 0);
     979                 :     }
     980                 : 
     981             117 :     MemoryContextSwitchTo(oldcontext);
     982             117 :     LWLockRelease(LogicalRepWorkerLock);
     983                 : }
     984                 : 
     985                 : /*
     986                 :  * Set the last-start time for the subscription.
     987                 :  */
     988                 : static void
     989             111 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
     990                 : {
     991                 :     LauncherLastStartTimesEntry *entry;
     992                 :     bool        found;
     993                 : 
     994             111 :     logicalrep_launcher_attach_dshmem();
     995                 : 
     996             111 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
     997             111 :     entry->last_start_time = start_time;
     998             111 :     dshash_release_lock(last_start_times, entry);
     999             111 : }
    1000                 : 
    1001                 : /*
    1002                 :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1003                 :  */
    1004                 : static TimestampTz
    1005             139 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1006                 : {
    1007                 :     LauncherLastStartTimesEntry *entry;
    1008                 :     TimestampTz ret;
    1009                 : 
    1010             139 :     logicalrep_launcher_attach_dshmem();
    1011                 : 
    1012             139 :     entry = dshash_find(last_start_times, &subid, false);
    1013             139 :     if (entry == NULL)
    1014              93 :         return 0;
    1015                 : 
    1016              46 :     ret = entry->last_start_time;
    1017              46 :     dshash_release_lock(last_start_times, entry);
    1018                 : 
    1019              46 :     return ret;
    1020                 : }
    1021                 : 
    1022                 : /*
    1023                 :  * Remove the last-start-time entry for the subscription, if one exists.
    1024                 :  *
    1025                 :  * This has two use-cases: to remove the entry related to a subscription
    1026                 :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1027                 :  * and to allow immediate restart of an apply worker that has exited
    1028                 :  * due to subscription parameter changes.
    1029                 :  */
    1030                 : void
    1031             108 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1032                 : {
    1033             108 :     logicalrep_launcher_attach_dshmem();
    1034                 : 
    1035             108 :     (void) dshash_delete_key(last_start_times, &subid);
    1036             108 : }
    1037                 : 
    1038 ECB             : /*
    1039                 :  * Wakeup the launcher on commit if requested.
    1040                 :  */
    1041                 : void
    1042 CBC      485915 : AtEOXact_ApplyLauncher(bool isCommit)
    1043                 : {
    1044 GIC      485915 :     if (isCommit)
    1045                 :     {
    1046          465429 :         if (on_commit_launcher_wakeup)
    1047              83 :             ApplyLauncherWakeup();
    1048                 :     }
    1049                 : 
    1050 CBC      485915 :     on_commit_launcher_wakeup = false;
    1051 GIC      485915 : }
    1052 ECB             : 
    1053                 : /*
    1054                 :  * Request wakeup of the launcher on commit of the transaction.
    1055                 :  *
    1056                 :  * This is used to send launcher signal to stop sleeping and process the
    1057                 :  * subscriptions when current transaction commits. Should be used when new
    1058                 :  * tuple was added to the pg_subscription catalog.
    1059                 : */
    1060                 : void
    1061 CBC          83 : ApplyLauncherWakeupAtCommit(void)
    1062                 : {
    1063 GIC          83 :     if (!on_commit_launcher_wakeup)
    1064 CBC          83 :         on_commit_launcher_wakeup = true;
    1065              83 : }
    1066                 : 
    1067 ECB             : static void
    1068 GIC         398 : ApplyLauncherWakeup(void)
    1069                 : {
    1070 CBC         398 :     if (LogicalRepCtx->launcher_pid != 0)
    1071             381 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1072 GIC         398 : }
    1073                 : 
    1074                 : /*
    1075                 :  * Main loop for the apply launcher process.
    1076                 :  */
    1077                 : void
    1078 CBC         322 : ApplyLauncherMain(Datum main_arg)
    1079                 : {
    1080 GIC         322 :     ereport(DEBUG1,
    1081                 :             (errmsg_internal("logical replication launcher started")));
    1082                 : 
    1083             322 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1084                 : 
    1085             322 :     Assert(LogicalRepCtx->launcher_pid == 0);
    1086 CBC         322 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1087                 : 
    1088                 :     /* Establish signal handlers. */
    1089             322 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1090 GIC         322 :     pqsignal(SIGTERM, die);
    1091 CBC         322 :     BackgroundWorkerUnblockSignals();
    1092                 : 
    1093                 :     /*
    1094 ECB             :      * Establish connection to nailed catalogs (we only ever access
    1095                 :      * pg_subscription).
    1096                 :      */
    1097 GIC         322 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1098 ECB             : 
    1099                 :     /* Enter main loop */
    1100                 :     for (;;)
    1101 GIC        1715 :     {
    1102 ECB             :         int         rc;
    1103                 :         List       *sublist;
    1104                 :         ListCell   *lc;
    1105                 :         MemoryContext subctx;
    1106                 :         MemoryContext oldctx;
    1107 GIC        2037 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1108                 : 
    1109 CBC        2037 :         CHECK_FOR_INTERRUPTS();
    1110                 : 
    1111                 :         /* Use temporary context to avoid leaking memory across cycles. */
    1112 GNC        2036 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1113                 :                                        "Logical Replication Launcher sublist",
    1114                 :                                        ALLOCSET_DEFAULT_SIZES);
    1115            2036 :         oldctx = MemoryContextSwitchTo(subctx);
    1116 ECB             : 
    1117                 :         /* Start any missing workers for enabled subscriptions. */
    1118 GNC        2036 :         sublist = get_subscription_list();
    1119            2412 :         foreach(lc, sublist)
    1120                 :         {
    1121             376 :             Subscription *sub = (Subscription *) lfirst(lc);
    1122                 :             LogicalRepWorker *w;
    1123                 :             TimestampTz last_start;
    1124                 :             TimestampTz now;
    1125                 :             long        elapsed;
    1126 ECB             : 
    1127 GNC         376 :             if (!sub->enabled)
    1128              22 :                 continue;
    1129 ECB             : 
    1130 GNC         354 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1131             354 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1132             354 :             LWLockRelease(LogicalRepWorkerLock);
    1133                 : 
    1134             354 :             if (w != NULL)
    1135             215 :                 continue;       /* worker is running already */
    1136                 : 
    1137                 :             /*
    1138                 :              * If the worker is eligible to start now, launch it.  Otherwise,
    1139                 :              * adjust wait_time so that we'll wake up as soon as it can be
    1140                 :              * started.
    1141                 :              *
    1142                 :              * Each subscription's apply worker can only be restarted once per
    1143                 :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1144                 :              * repeatedly restart the worker as fast as possible.  In cases
    1145                 :              * where a restart is expected (e.g., subscription parameter
    1146                 :              * changes), another process should remove the last-start entry
    1147                 :              * for the subscription so that the worker can be restarted
    1148                 :              * without waiting for wal_retrieve_retry_interval to elapse.
    1149                 :              */
    1150             139 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1151             139 :             now = GetCurrentTimestamp();
    1152             139 :             if (last_start == 0 ||
    1153              46 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1154                 :             {
    1155             111 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1156             111 :                 logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
    1157                 :                                          sub->owner, InvalidOid,
    1158                 :                                          DSM_HANDLE_INVALID);
    1159                 :             }
    1160                 :             else
    1161                 :             {
    1162              28 :                 wait_time = Min(wait_time,
    1163                 :                                 wal_retrieve_retry_interval - elapsed);
    1164                 :             }
    1165 ECB             :         }
    1166                 : 
    1167                 :         /* Switch back to original memory context. */
    1168 GNC        2036 :         MemoryContextSwitchTo(oldctx);
    1169                 :         /* Clean the temporary memory. */
    1170            2036 :         MemoryContextDelete(subctx);
    1171                 : 
    1172 ECB             :         /* Wait for more work. */
    1173 CBC        2036 :         rc = WaitLatch(MyLatch,
    1174 ECB             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1175                 :                        wait_time,
    1176                 :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1177                 : 
    1178 GIC        2033 :         if (rc & WL_LATCH_SET)
    1179 ECB             :         {
    1180 CBC        2023 :             ResetLatch(MyLatch);
    1181            2023 :             CHECK_FOR_INTERRUPTS();
    1182                 :         }
    1183 ECB             : 
    1184 GIC        1715 :         if (ConfigReloadPending)
    1185                 :         {
    1186              25 :             ConfigReloadPending = false;
    1187              25 :             ProcessConfigFile(PGC_SIGHUP);
    1188                 :         }
    1189                 :     }
    1190                 : 
    1191 ECB             :     /* Not reachable */
    1192                 : }
    1193                 : 
    1194                 : /*
    1195                 :  * Is current process the logical replication launcher?
    1196                 :  */
    1197                 : bool
    1198 GIC         336 : IsLogicalLauncher(void)
    1199                 : {
    1200 CBC         336 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1201                 : }
    1202                 : 
    1203                 : /*
    1204                 :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1205                 :  * parallel apply worker, otherwise, return InvalidPid.
    1206                 :  */
    1207                 : pid_t
    1208 GNC         580 : GetLeaderApplyWorkerPid(pid_t pid)
    1209                 : {
    1210             580 :     int         leader_pid = InvalidPid;
    1211                 :     int         i;
    1212                 : 
    1213             580 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1214                 : 
    1215            2900 :     for (i = 0; i < max_logical_replication_workers; i++)
    1216                 :     {
    1217            2320 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1218                 : 
    1219            2320 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1220                 :         {
    1221 UNC           0 :             leader_pid = w->leader_pid;
    1222               0 :             break;
    1223                 :         }
    1224                 :     }
    1225                 : 
    1226 GNC         580 :     LWLockRelease(LogicalRepWorkerLock);
    1227                 : 
    1228             580 :     return leader_pid;
    1229                 : }
    1230                 : 
    1231                 : /*
    1232 ECB             :  * Returns state of the subscriptions.
    1233                 :  */
    1234                 : Datum
    1235 CBC           1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1236                 : {
    1237                 : #define PG_STAT_GET_SUBSCRIPTION_COLS   9
    1238               1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1239                 :     int         i;
    1240               1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1241                 : 
    1242               1 :     InitMaterializedSRF(fcinfo, 0);
    1243 ECB             : 
    1244                 :     /* Make sure we get consistent view of the workers. */
    1245 GIC           1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1246 ECB             : 
    1247 GIC           5 :     for (i = 0; i < max_logical_replication_workers; i++)
    1248                 :     {
    1249                 :         /* for each row */
    1250 GNC           4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1251               4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1252                 :         int         worker_pid;
    1253                 :         LogicalRepWorker worker;
    1254 ECB             : 
    1255 GIC           4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1256                 :                sizeof(LogicalRepWorker));
    1257               4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1258               2 :             continue;
    1259 ECB             : 
    1260 CBC           2 :         if (OidIsValid(subid) && worker.subid != subid)
    1261 LBC           0 :             continue;
    1262                 : 
    1263 GIC           2 :         worker_pid = worker.proc->pid;
    1264 ECB             : 
    1265 GIC           2 :         values[0] = ObjectIdGetDatum(worker.subid);
    1266 CBC           2 :         if (OidIsValid(worker.relid))
    1267 UIC           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1268                 :         else
    1269 CBC           2 :             nulls[1] = true;
    1270               2 :         values[2] = Int32GetDatum(worker_pid);
    1271                 : 
    1272 GNC           2 :         if (isParallelApplyWorker(&worker))
    1273 UNC           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1274                 :         else
    1275 GNC           2 :             nulls[3] = true;
    1276                 : 
    1277               2 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1278 CBC           1 :             nulls[4] = true;
    1279                 :         else
    1280 GNC           1 :             values[4] = LSNGetDatum(worker.last_lsn);
    1281               2 :         if (worker.last_send_time == 0)
    1282 UIC           0 :             nulls[5] = true;
    1283 ECB             :         else
    1284 GNC           2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1285               2 :         if (worker.last_recv_time == 0)
    1286 LBC           0 :             nulls[6] = true;
    1287                 :         else
    1288 GNC           2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1289               2 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1290 CBC           1 :             nulls[7] = true;
    1291                 :         else
    1292 GNC           1 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1293               2 :         if (worker.reply_time == 0)
    1294 UNC           0 :             nulls[8] = true;
    1295                 :         else
    1296 GNC           2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1297                 : 
    1298 GIC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1299                 :                              values, nulls);
    1300                 : 
    1301 ECB             :         /*
    1302                 :          * If only a single subscription was requested, and we found it,
    1303                 :          * break.
    1304                 :          */
    1305 GIC           2 :         if (OidIsValid(subid))
    1306 LBC           0 :             break;
    1307                 :     }
    1308 ECB             : 
    1309 CBC           1 :     LWLockRelease(LogicalRepWorkerLock);
    1310 ECB             : 
    1311 CBC           1 :     return (Datum) 0;
    1312                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a