LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 89.7 % 445 399 10 12 23 1 12 164 142 81 30 266 3 40
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 31 31 22 9 29 2
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 91.9 % 149 137 10 1 1 135 2
Legend: Lines: hit not hit (120,180] days: 100.0 % 1 1 1
(240..) days: 88.5 % 295 261 11 22 1 12 164 7 78 21 155
Function coverage date bins:
(60,120] days: 100.0 % 9 9 9
(240..) days: 52.4 % 42 22 22 20

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  * launcher.c
                                  3                 :  *     PostgreSQL logical replication worker launcher process
                                  4                 :  *
                                  5                 :  * Copyright (c) 2016-2023, PostgreSQL Global Development Group
                                  6                 :  *
                                  7                 :  * IDENTIFICATION
                                  8                 :  *    src/backend/replication/logical/launcher.c
                                  9                 :  *
                                 10                 :  * NOTES
                                 11                 :  *    This module contains the logical replication worker launcher which
                                 12                 :  *    uses the background worker infrastructure to start the logical
                                 13                 :  *    replication workers for every enabled subscription.
                                 14                 :  *
                                 15                 :  *-------------------------------------------------------------------------
                                 16                 :  */
                                 17                 : 
                                 18                 : #include "postgres.h"
                                 19                 : 
                                 20                 : #include "access/heapam.h"
                                 21                 : #include "access/htup.h"
                                 22                 : #include "access/htup_details.h"
                                 23                 : #include "access/tableam.h"
                                 24                 : #include "access/xact.h"
                                 25                 : #include "catalog/pg_subscription.h"
                                 26                 : #include "catalog/pg_subscription_rel.h"
                                 27                 : #include "funcapi.h"
                                 28                 : #include "lib/dshash.h"
                                 29                 : #include "libpq/pqsignal.h"
                                 30                 : #include "miscadmin.h"
                                 31                 : #include "pgstat.h"
                                 32                 : #include "postmaster/bgworker.h"
                                 33                 : #include "postmaster/fork_process.h"
                                 34                 : #include "postmaster/interrupt.h"
                                 35                 : #include "postmaster/postmaster.h"
                                 36                 : #include "replication/logicallauncher.h"
                                 37                 : #include "replication/logicalworker.h"
                                 38                 : #include "replication/slot.h"
                                 39                 : #include "replication/walreceiver.h"
                                 40                 : #include "replication/worker_internal.h"
                                 41                 : #include "storage/ipc.h"
                                 42                 : #include "storage/proc.h"
                                 43                 : #include "storage/procarray.h"
                                 44                 : #include "storage/procsignal.h"
                                 45                 : #include "tcop/tcopprot.h"
                                 46                 : #include "utils/builtins.h"
                                 47                 : #include "utils/memutils.h"
                                 48                 : #include "utils/pg_lsn.h"
                                 49                 : #include "utils/ps_status.h"
                                 50                 : #include "utils/snapmgr.h"
                                 51                 : #include "utils/timeout.h"
                                 52                 : 
                                 53                 : /* max sleep time between cycles (3min) */
                                 54                 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
                                 55                 : 
                                 56                 : /* GUC variables */
                                 57                 : int         max_logical_replication_workers = 4;
                                 58                 : int         max_sync_workers_per_subscription = 2;
                                 59                 : int         max_parallel_apply_workers_per_subscription = 2;
                                 60                 : 
                                 61                 : LogicalRepWorker *MyLogicalRepWorker = NULL;
                                 62                 : 
                                 63                 : typedef struct LogicalRepCtxStruct
                                 64                 : {
                                 65                 :     /* Supervisor process. */
                                 66                 :     pid_t       launcher_pid;
                                 67                 : 
                                 68                 :     /* Hash table holding last start times of subscriptions' apply workers. */
                                 69                 :     dsa_handle  last_start_dsa;
                                 70                 :     dshash_table_handle last_start_dsh;
                                 71                 : 
                                 72                 :     /* Background workers. */
                                 73                 :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
                                 74                 : } LogicalRepCtxStruct;
                                 75                 : 
                                 76                 : static LogicalRepCtxStruct *LogicalRepCtx;
                                 77                 : 
                                 78                 : /* an entry in the last-start-times shared hash table */
                                 79                 : typedef struct LauncherLastStartTimesEntry
                                 80                 : {
                                 81                 :     Oid         subid;          /* OID of logrep subscription (hash key) */
                                 82                 :     TimestampTz last_start_time;    /* last time its apply worker was started */
                                 83                 : } LauncherLastStartTimesEntry;
                                 84                 : 
                                 85                 : /* parameters for the last-start-times shared hash table */
                                 86                 : static const dshash_parameters dsh_params = {
                                 87                 :     sizeof(Oid),
                                 88                 :     sizeof(LauncherLastStartTimesEntry),
                                 89                 :     dshash_memcmp,
                                 90                 :     dshash_memhash,
                                 91                 :     LWTRANCHE_LAUNCHER_HASH
                                 92                 : };
                                 93                 : 
                                 94                 : static dsa_area *last_start_times_dsa = NULL;
                                 95                 : static dshash_table *last_start_times = NULL;
                                 96                 : 
                                 97                 : static bool on_commit_launcher_wakeup = false;
                                 98                 : 
                                 99                 : 
                                100                 : static void ApplyLauncherWakeup(void);
                                101                 : static void logicalrep_launcher_onexit(int code, Datum arg);
                                102                 : static void logicalrep_worker_onexit(int code, Datum arg);
                                103                 : static void logicalrep_worker_detach(void);
                                104                 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
                                105                 : static int  logicalrep_pa_worker_count(Oid subid);
                                106                 : static void logicalrep_launcher_attach_dshmem(void);
                                107                 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
                                108                 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
                                109                 : 
                                110                 : 
                                111                 : /*
                                112                 :  * Load the list of subscriptions.
                                113                 :  *
                                114                 :  * Only the fields interesting for worker start/stop functions are filled for
                                115                 :  * each subscription.
                                116                 :  */
                                117                 : static List *
 2271 peter_e                   118 GIC        2036 : get_subscription_list(void)
                                119                 : {
                                120            2036 :     List       *res = NIL;
                                121                 :     Relation    rel;
                                122                 :     TableScanDesc scan;
                                123                 :     HeapTuple   tup;
                                124                 :     MemoryContext resultcxt;
                                125                 : 
                                126                 :     /* This is the context that we will allocate our output data in */
                                127            2036 :     resultcxt = CurrentMemoryContext;
                                128                 : 
                                129                 :     /*
                                130                 :      * Start a transaction so we can access pg_database, and get a snapshot.
                                131                 :      * We don't have a use for the snapshot itself, but we're interested in
                                132                 :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
                                133                 :      * for anything that reads heap pages, because HOT may decide to prune
                                134                 :      * them even if the process doesn't attempt to modify any tuples.)
                                135                 :      *
                                136                 :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
                                137                 :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
                                138                 :      * e.g. be cleared when cache invalidations are processed).
                                139                 :      */
                                140            2036 :     StartTransactionCommand();
                                141            2036 :     (void) GetTransactionSnapshot();
                                142                 : 
 1539 andres                    143            2036 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
 1490                           144            2036 :     scan = table_beginscan_catalog(rel, 0, NULL);
                                145                 : 
 2271 peter_e                   146            2412 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
                                147                 :     {
                                148             376 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
 2153 bruce                     149 ECB             :         Subscription *sub;
                                150                 :         MemoryContext oldcxt;
 2271 peter_e                   151                 : 
                                152                 :         /*
                                153                 :          * Allocate our results in the caller's context, not the
                                154                 :          * transaction's. We do this inside the loop, and restore the original
                                155                 :          * context at the end, so that leaky things like heap_getnext() are
                                156                 :          * not called in a potentially long-lived context.
                                157                 :          */
 2271 peter_e                   158 CBC         376 :         oldcxt = MemoryContextSwitchTo(resultcxt);
                                159                 : 
 2186 peter_e                   160 GIC         376 :         sub = (Subscription *) palloc0(sizeof(Subscription));
 1601 andres                    161             376 :         sub->oid = subform->oid;
 2271 peter_e                   162             376 :         sub->dbid = subform->subdbid;
                                163             376 :         sub->owner = subform->subowner;
                                164             376 :         sub->enabled = subform->subenabled;
                                165             376 :         sub->name = pstrdup(NameStr(subform->subname));
                                166                 :         /* We don't fill fields we are not interested in. */
                                167                 : 
                                168             376 :         res = lappend(res, sub);
                                169             376 :         MemoryContextSwitchTo(oldcxt);
                                170                 :     }
 2271 peter_e                   171 ECB             : 
 1490 andres                    172 CBC        2036 :     table_endscan(scan);
 1539 andres                    173 GIC        2036 :     table_close(rel, AccessShareLock);
 2271 peter_e                   174 ECB             : 
 2271 peter_e                   175 CBC        2036 :     CommitTransactionCommand();
                                176                 : 
                                177            2036 :     return res;
                                178                 : }
 2271 peter_e                   179 ECB             : 
                                180                 : /*
                                181                 :  * Wait for a background worker to start up and attach to the shmem context.
                                182                 :  *
                                183                 :  * This is only needed for cleaning up the shared memory in case the worker
                                184                 :  * fails to attach.
                                185                 :  *
                                186                 :  * Returns whether the attach was successful.
                                187                 :  */
                                188                 : static bool
 2271 peter_e                   189 GIC         274 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                190                 :                                uint16 generation,
 2271 peter_e                   191 ECB             :                                BackgroundWorkerHandle *handle)
                                192                 : {
                                193                 :     BgwHandleStatus status;
                                194                 :     int         rc;
                                195                 : 
                                196                 :     for (;;)
 2271 peter_e                   197 CBC         774 :     {
 2271 peter_e                   198 ECB             :         pid_t       pid;
                                199                 : 
 2271 peter_e                   200 GIC        1048 :         CHECK_FOR_INTERRUPTS();
 2271 peter_e                   201 ECB             : 
 2174 peter_e                   202 CBC        1048 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                203                 : 
                                204                 :         /* Worker either died or has started. Return false if died. */
                                205            1048 :         if (!worker->in_use || worker->proc)
 2174 peter_e                   206 ECB             :         {
 2174 peter_e                   207 GIC         274 :             LWLockRelease(LogicalRepWorkerLock);
   90 akapila                   208 GNC         274 :             return worker->in_use;
                                209                 :         }
 2174 peter_e                   210 ECB             : 
 2174 peter_e                   211 GIC         774 :         LWLockRelease(LogicalRepWorkerLock);
                                212                 : 
                                213                 :         /* Check if worker has died before attaching, and clean up after it. */
 2271                           214             774 :         status = GetBackgroundWorkerPid(handle, &pid);
                                215                 : 
                                216             774 :         if (status == BGWH_STOPPED)
                                217                 :         {
 2174 peter_e                   218 UIC           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                219                 :             /* Ensure that this was indeed the worker we waited for. */
                                220               0 :             if (generation == worker->generation)
                                221               0 :                 logicalrep_worker_cleanup(worker);
 2174 peter_e                   222 LBC           0 :             LWLockRelease(LogicalRepWorkerLock);
   90 akapila                   223 UNC           0 :             return false;
                                224                 :         }
                                225                 : 
                                226                 :         /*
                                227                 :          * We need timeout because we generally don't get notified via latch
                                228                 :          * about the worker attach.  But we don't expect to have to wait long.
                                229                 :          */
 2271 peter_e                   230 CBC         774 :         rc = WaitLatch(MyLatch,
                                231                 :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                232                 :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
 2271 peter_e                   233 ECB             : 
 2133 andres                    234 GIC         774 :         if (rc & WL_LATCH_SET)
 2133 andres                    235 ECB             :         {
 2133 andres                    236 GIC         296 :             ResetLatch(MyLatch);
                                237             296 :             CHECK_FOR_INTERRUPTS();
 2133 andres                    238 ECB             :         }
                                239                 :     }
 2271 peter_e                   240                 : }
                                241                 : 
                                242                 : /*
                                243                 :  * Walks the workers array and searches for one that matches given
 2208                           244                 :  * subscription id and relid.
                                245                 :  *
                                246                 :  * We are only interested in the leader apply worker or table sync worker.
                                247                 :  */
                                248                 : LogicalRepWorker *
 2208 peter_e                   249 CBC        2137 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
                                250                 : {
 2153 bruce                     251 ECB             :     int         i;
 2153 bruce                     252 GIC        2137 :     LogicalRepWorker *res = NULL;
 2271 peter_e                   253 EUB             : 
 2271 peter_e                   254 GIC        2137 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 2208 peter_e                   255 EUB             : 
 2271                           256                 :     /* Search for attached worker for a given subscription id. */
 2271 peter_e                   257 GBC        6794 :     for (i = 0; i < max_logical_replication_workers; i++)
 2271 peter_e                   258 EUB             :     {
 2153 bruce                     259 GIC        5922 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                260                 : 
                                261                 :         /* Skip parallel apply workers. */
   90 akapila                   262 GNC        5922 :         if (isParallelApplyWorker(w))
                                263            1246 :             continue;
                                264                 : 
 2174 peter_e                   265 GIC        4676 :         if (w->in_use && w->subid == subid && w->relid == relid &&
                                266            1265 :             (!only_running || w->proc))
                                267                 :         {
 2271                           268            1265 :             res = w;
 2271 peter_e                   269 CBC        1265 :             break;
                                270                 :         }
                                271                 :     }
                                272                 : 
                                273            2137 :     return res;
                                274                 : }
 2271 peter_e                   275 ECB             : 
 2074                           276                 : /*
                                277                 :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
                                278                 :  * the subscription, instead of just one.
                                279                 :  */
                                280                 : List *
 2074 peter_e                   281 GIC         373 : logicalrep_workers_find(Oid subid, bool only_running)
                                282                 : {
                                283                 :     int         i;
                                284             373 :     List       *res = NIL;
                                285                 : 
                                286             373 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                287                 : 
 2074 peter_e                   288 ECB             :     /* Search for attached worker for a given subscription id. */
 2074 peter_e                   289 GIC        1957 :     for (i = 0; i < max_logical_replication_workers; i++)
                                290                 :     {
 2074 peter_e                   291 CBC        1584 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                292                 : 
                                293            1584 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
 2074 peter_e                   294 GIC         240 :             res = lappend(res, w);
                                295                 :     }
 2074 peter_e                   296 ECB             : 
 2074 peter_e                   297 GIC         373 :     return res;
 2074 peter_e                   298 ECB             : }
                                299                 : 
                                300                 : /*
                                301                 :  * Start new logical replication background worker, if possible.
                                302                 :  *
                                303                 :  * Returns true on success, false on failure.
 2271                           304                 :  */
                                305                 : bool
 2208 peter_e                   306 CBC         274 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
                                307                 :                          Oid relid, dsm_handle subworker_dsm)
                                308                 : {
 2153 bruce                     309 ECB             :     BackgroundWorker bgw;
 2271 peter_e                   310                 :     BackgroundWorkerHandle *bgw_handle;
                                311                 :     uint16      generation;
                                312                 :     int         i;
 2153 bruce                     313 GIC         274 :     int         slot = 0;
 2153 bruce                     314 CBC         274 :     LogicalRepWorker *worker = NULL;
                                315                 :     int         nsyncworkers;
                                316                 :     int         nparallelapplyworkers;
                                317                 :     TimestampTz now;
   90 akapila                   318 GNC         274 :     bool        is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
                                319                 : 
                                320                 :     /* Sanity check - tablesync worker cannot be a subworker */
                                321             274 :     Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
                                322                 : 
 2146 peter_e                   323 GIC         274 :     ereport(DEBUG1,
                                324                 :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
                                325                 :                              subname)));
                                326                 : 
 2271 peter_e                   327 ECB             :     /* Report this after the initial starting message for consistency. */
 2271 peter_e                   328 GIC         274 :     if (max_replication_slots == 0)
 2271 peter_e                   329 UIC           0 :         ereport(ERROR,
 2271 peter_e                   330 ECB             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                331                 :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
                                332                 : 
                                333                 :     /*
                                334                 :      * We need to do the modification of the shared memory under lock so that
                                335                 :      * we have consistent view.
                                336                 :      */
 2271 peter_e                   337 CBC         274 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                338                 : 
 2174                           339             274 : retry:
 2271 peter_e                   340 ECB             :     /* Find unused worker slot. */
 2174 peter_e                   341 GIC         503 :     for (i = 0; i < max_logical_replication_workers; i++)
                                342                 :     {
 2174 peter_e                   343 CBC         503 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                344                 : 
 2174 peter_e                   345 GIC         503 :         if (!w->in_use)
                                346                 :         {
                                347             274 :             worker = w;
                                348             274 :             slot = i;
 2271                           349             274 :             break;
                                350                 :         }
                                351                 :     }
 2271 peter_e                   352 ECB             : 
 2174 peter_e                   353 GIC         274 :     nsyncworkers = logicalrep_sync_worker_count(subid);
                                354                 : 
                                355             274 :     now = GetCurrentTimestamp();
                                356                 : 
                                357                 :     /*
                                358                 :      * If we didn't find a free slot, try to do garbage collection.  The
 2174 peter_e                   359 ECB             :      * reason we do this is because if some worker failed to start up and its
                                360                 :      * parent has crashed while waiting, the in_use state was never cleared.
                                361                 :      */
 2174 peter_e                   362 GIC         274 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
                                363                 :     {
 2153 bruce                     364 LBC           0 :         bool        did_cleanup = false;
                                365                 : 
 2174 peter_e                   366 UIC           0 :         for (i = 0; i < max_logical_replication_workers; i++)
 2174 peter_e                   367 ECB             :         {
 2174 peter_e                   368 UIC           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 2174 peter_e                   369 ECB             : 
                                370                 :             /*
                                371                 :              * If the worker was marked in use but didn't manage to attach in
                                372                 :              * time, clean it up.
                                373                 :              */
 2174 peter_e                   374 LBC           0 :             if (w->in_use && !w->proc &&
 2174 peter_e                   375 UBC           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
                                376                 :                                            wal_receiver_timeout))
                                377                 :             {
 2174 peter_e                   378 UIC           0 :                 elog(WARNING,
                                379                 :                      "logical replication worker for subscription %u took too long to start; canceled",
                                380                 :                      w->subid);
                                381                 : 
                                382               0 :                 logicalrep_worker_cleanup(w);
 2174 peter_e                   383 LBC           0 :                 did_cleanup = true;
                                384                 :             }
 2174 peter_e                   385 ECB             :         }
                                386                 : 
 2174 peter_e                   387 LBC           0 :         if (did_cleanup)
 2174 peter_e                   388 UIC           0 :             goto retry;
 2174 peter_e                   389 ECB             :     }
                                390                 : 
                                391                 :     /*
                                392                 :      * We don't allow to invoke more sync workers once we have reached the
  332 tgl                       393                 :      * sync worker limit per subscription. So, just return silently as we
                                394                 :      * might get here because of an otherwise harmless race condition.
 2174 peter_e                   395                 :      */
  355 akapila                   396 GIC         274 :     if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
                                397                 :     {
 2174 peter_e                   398 UIC           0 :         LWLockRelease(LogicalRepWorkerLock);
   90 akapila                   399 UNC           0 :         return false;
                                400                 :     }
                                401                 : 
   90 akapila                   402 GNC         274 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
                                403                 : 
                                404                 :     /*
                                405                 :      * Return false if the number of parallel apply workers reached the limit
                                406                 :      * per subscription.
                                407                 :      */
                                408             274 :     if (is_parallel_apply_worker &&
                                409              10 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
                                410                 :     {
   90 akapila                   411 UNC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                412               0 :         return false;
                                413                 :     }
 2174 peter_e                   414 ECB             : 
                                415                 :     /*
                                416                 :      * However if there are no more free worker slots, inform user about it
                                417                 :      * before exiting.
                                418                 :      */
 2271 peter_e                   419 GIC         274 :     if (worker == NULL)
                                420                 :     {
 2266 fujii                     421 LBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                   422 UIC           0 :         ereport(WARNING,
 2271 peter_e                   423 EUB             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                424                 :                  errmsg("out of logical replication worker slots"),
                                425                 :                  errhint("You might need to increase max_logical_replication_workers.")));
   90 akapila                   426 UNC           0 :         return false;
 2271 peter_e                   427 EUB             :     }
                                428                 : 
                                429                 :     /* Prepare the worker slot. */
 2174 peter_e                   430 GIC         274 :     worker->launch_time = now;
                                431             274 :     worker->in_use = true;
                                432             274 :     worker->generation++;
 2208 peter_e                   433 GBC         274 :     worker->proc = NULL;
 2271                           434             274 :     worker->dbid = dbid;
 2271 peter_e                   435 GIC         274 :     worker->userid = userid;
                                436             274 :     worker->subid = subid;
 2208 peter_e                   437 GBC         274 :     worker->relid = relid;
 2208 peter_e                   438 GIC         274 :     worker->relstate = SUBREL_STATE_UNKNOWN;
                                439             274 :     worker->relstate_lsn = InvalidXLogRecPtr;
  584 akapila                   440             274 :     worker->stream_fileset = NULL;
   81 akapila                   441 GNC         274 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
   90                           442             274 :     worker->parallel_apply = is_parallel_apply_worker;
 2208 peter_e                   443 GBC         274 :     worker->last_lsn = InvalidXLogRecPtr;
                                444             274 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
 2208 peter_e                   445 GIC         274 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
                                446             274 :     worker->reply_lsn = InvalidXLogRecPtr;
                                447             274 :     TIMESTAMP_NOBEGIN(worker->reply_time);
 2271 peter_e                   448 EUB             : 
 2029 tgl                       449                 :     /* Before releasing lock, remember generation for future identification. */
 2029 tgl                       450 GIC         274 :     generation = worker->generation;
                                451                 : 
 2271 peter_e                   452             274 :     LWLockRelease(LogicalRepWorkerLock);
                                453                 : 
                                454                 :     /* Register the new dynamic worker. */
 2184 tgl                       455             274 :     memset(&bgw, 0, sizeof(bgw));
 2153 bruce                     456             274 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 2271 peter_e                   457 ECB             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 2271 peter_e                   458 GIC         274 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 2200 rhaas                     459 GBC         274 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
                                460                 : 
   90 akapila                   461 GNC         274 :     if (is_parallel_apply_worker)
                                462              10 :         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
                                463                 :     else
                                464             264 :         snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
                                465                 : 
 2208 peter_e                   466 GIC         274 :     if (OidIsValid(relid))
                                467             153 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
 2208 peter_e                   468 ECB             :                  "logical replication worker for subscription %u sync %u", subid, relid);
   90 akapila                   469 GNC         121 :     else if (is_parallel_apply_worker)
                                470              10 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
                                471                 :                  "logical replication parallel apply worker for subscription %u", subid);
                                472                 :     else
 2208 peter_e                   473 GIC         111 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
                                474                 :                  "logical replication apply worker for subscription %u", subid);
                                475                 : 
   90 akapila                   476 GNC         274 :     if (is_parallel_apply_worker)
                                477              10 :         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
                                478                 :     else
                                479             264 :         snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
                                480                 : 
 2271 peter_e                   481 CBC         274 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
                                482             274 :     bgw.bgw_notify_pid = MyProcPid;
 2181 fujii                     483 GIC         274 :     bgw.bgw_main_arg = Int32GetDatum(slot);
 2271 peter_e                   484 EUB             : 
   90 akapila                   485 GNC         274 :     if (is_parallel_apply_worker)
                                486              10 :         memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
                                487                 : 
 2271 peter_e                   488 GBC         274 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
                                489                 :     {
                                490                 :         /* Failed to start worker, so clean up the worker slot. */
 2029 tgl                       491 UIC           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                492               0 :         Assert(generation == worker->generation);
                                493               0 :         logicalrep_worker_cleanup(worker);
                                494               0 :         LWLockRelease(LogicalRepWorkerLock);
 2029 tgl                       495 ECB             : 
 2271 peter_e                   496 UIC           0 :         ereport(WARNING,
 2271 peter_e                   497 EUB             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 2170 tgl                       498                 :                  errmsg("out of background worker slots"),
                                499                 :                  errhint("You might need to increase max_worker_processes.")));
   90 akapila                   500 UNC           0 :         return false;
                                501                 :     }
 2271 peter_e                   502 EUB             : 
                                503                 :     /* Now wait until it attaches. */
   90 akapila                   504 GNC         274 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
                                505                 : }
 2271 peter_e                   506 ECB             : 
                                507                 : /*
                                508                 :  * Internal function to stop the worker and wait until it detaches from the
                                509                 :  * slot.
                                510                 :  */
                                511                 : static void
   90 akapila                   512 GNC          45 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
 2271 peter_e                   513 ECB             : {
 2153 bruce                     514                 :     uint16      generation;
 2271 peter_e                   515                 : 
   90 akapila                   516 GNC          45 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
                                517                 : 
 2174 peter_e                   518 ECB             :     /*
                                519                 :      * Remember which generation was our worker so we can check if what we see
                                520                 :      * is still the same one.
                                521                 :      */
 2174 peter_e                   522 CBC          45 :     generation = worker->generation;
                                523                 : 
 2271 peter_e                   524 ECB             :     /*
 2108 tgl                       525                 :      * If we found a worker but it does not have proc set then it is still
                                526                 :      * starting up; wait for it to finish starting and then kill it.
 2271 peter_e                   527                 :      */
 2174 peter_e                   528 CBC          45 :     while (worker->in_use && !worker->proc)
                                529                 :     {
 2153 bruce                     530 ECB             :         int         rc;
                                531                 : 
 2271 peter_e                   532 CBC           2 :         LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                   533 ECB             : 
                                534                 :         /* Wait a bit --- we don't expect to have to wait long. */
 2133 andres                    535 CBC           2 :         rc = WaitLatch(MyLatch,
 1598 tmunro                    536 ECB             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                537                 :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                538                 : 
 2133 andres                    539 CBC           2 :         if (rc & WL_LATCH_SET)
                                540                 :         {
 2133 andres                    541 UIC           0 :             ResetLatch(MyLatch);
 2133 andres                    542 LBC           0 :             CHECK_FOR_INTERRUPTS();
 2133 andres                    543 ECB             :         }
                                544                 : 
 2108 tgl                       545                 :         /* Recheck worker status. */
 2271 peter_e                   546 GIC           2 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 2267 peter_e                   547 ECB             : 
                                548                 :         /*
 2174                           549                 :          * Check whether the worker slot is no longer used, which would mean
                                550                 :          * that the worker has exited, or whether the worker generation is
                                551                 :          * different, meaning that a different worker has taken the slot.
 2267                           552                 :          */
 2174 peter_e                   553 GIC           2 :         if (!worker->in_use || worker->generation != generation)
 2267 peter_e                   554 UIC           0 :             return;
 2267 peter_e                   555 EUB             : 
                                556                 :         /* Worker has assigned proc, so it has started. */
 2267 peter_e                   557 GBC           2 :         if (worker->proc)
 2271 peter_e                   558 GIC           2 :             break;
 2271 peter_e                   559 EUB             :     }
                                560                 : 
                                561                 :     /* Now terminate the worker ... */
   90 akapila                   562 GNC          45 :     kill(worker->proc->pid, signo);
 2271 peter_e                   563 EUB             : 
                                564                 :     /* ... and wait for it to die. */
                                565                 :     for (;;)
 2271 peter_e                   566 GIC          55 :     {
 2153 bruce                     567 ECB             :         int         rc;
                                568                 : 
                                569                 :         /* is it gone? */
 2174 peter_e                   570 GIC         100 :         if (!worker->proc || worker->generation != generation)
                                571                 :             break;
                                572                 : 
 2108 tgl                       573              55 :         LWLockRelease(LogicalRepWorkerLock);
                                574                 : 
 2108 tgl                       575 ECB             :         /* Wait a bit --- we don't expect to have to wait long. */
 2133 andres                    576 GIC          55 :         rc = WaitLatch(MyLatch,
                                577                 :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                578                 :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
 2271 peter_e                   579 ECB             : 
 2133 andres                    580 GIC          55 :         if (rc & WL_LATCH_SET)
                                581                 :         {
                                582              22 :             ResetLatch(MyLatch);
                                583              22 :             CHECK_FOR_INTERRUPTS();
                                584                 :         }
 2108 tgl                       585 ECB             : 
 2108 tgl                       586 GIC          55 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                587                 :     }
                                588                 : }
                                589                 : 
                                590                 : /*
                                591                 :  * Stop the logical replication worker for subid/relid, if any.
                                592                 :  */
                                593                 : void
   90 akapila                   594 GNC          67 : logicalrep_worker_stop(Oid subid, Oid relid)
                                595                 : {
                                596                 :     LogicalRepWorker *worker;
                                597                 : 
                                598              67 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                599                 : 
                                600              67 :     worker = logicalrep_worker_find(subid, relid, false);
                                601                 : 
                                602              67 :     if (worker)
                                603                 :     {
                                604              37 :         Assert(!isParallelApplyWorker(worker));
                                605              37 :         logicalrep_worker_stop_internal(worker, SIGTERM);
                                606                 :     }
                                607                 : 
                                608              67 :     LWLockRelease(LogicalRepWorkerLock);
                                609              67 : }
                                610                 : 
                                611                 : /*
                                612                 :  * Stop the logical replication parallel apply worker corresponding to the
                                613                 :  * input slot number.
                                614                 :  *
                                615                 :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
                                616                 :  * worker so that the worker exits cleanly.
                                617                 :  */
                                618                 : void
                                619               5 : logicalrep_pa_worker_stop(int slot_no, uint16 generation)
                                620                 : {
                                621                 :     LogicalRepWorker *worker;
                                622                 : 
                                623               5 :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
                                624                 : 
                                625               5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                626                 : 
                                627               5 :     worker = &LogicalRepCtx->workers[slot_no];
                                628               5 :     Assert(isParallelApplyWorker(worker));
                                629                 : 
                                630                 :     /*
                                631                 :      * Only stop the worker if the generation matches and the worker is alive.
                                632                 :      */
                                633               5 :     if (worker->generation == generation && worker->proc)
                                634               5 :         logicalrep_worker_stop_internal(worker, SIGINT);
                                635                 : 
 2108 tgl                       636 GIC           5 :     LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                   637               5 : }
 2271 peter_e                   638 ECB             : 
                                639                 : /*
                                640                 :  * Wake up (using latch) any logical replication worker for specified sub/rel.
                                641                 :  */
 2208                           642                 : void
 2208 peter_e                   643 GIC         175 : logicalrep_worker_wakeup(Oid subid, Oid relid)
                                644                 : {
 2153 bruce                     645 ECB             :     LogicalRepWorker *worker;
                                646                 : 
 2208 peter_e                   647 GIC         175 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                648                 : 
 2208 peter_e                   649 CBC         175 :     worker = logicalrep_worker_find(subid, relid, true);
                                650                 : 
 2208 peter_e                   651 GBC         175 :     if (worker)
                                652             175 :         logicalrep_worker_wakeup_ptr(worker);
                                653                 : 
 2109 tgl                       654 GIC         175 :     LWLockRelease(LogicalRepWorkerLock);
 2208 peter_e                   655             175 : }
 2208 peter_e                   656 ECB             : 
                                657                 : /*
                                658                 :  * Wake up (using latch) the specified logical replication worker.
                                659                 :  *
                                660                 :  * Caller must hold lock, else worker->proc could change under us.
                                661                 :  */
                                662                 : void
 2208 peter_e                   663 CBC         520 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
 2208 peter_e                   664 EUB             : {
 2109 tgl                       665 GIC         520 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                666                 : 
 2208 peter_e                   667 CBC         520 :     SetLatch(&worker->proc->procLatch);
                                668             520 : }
                                669                 : 
                                670                 : /*
                                671                 :  * Attach to a slot.
 2271 peter_e                   672 ECB             :  */
                                673                 : void
 2271 peter_e                   674 GIC         315 : logicalrep_worker_attach(int slot)
                                675                 : {
 2271 peter_e                   676 ECB             :     /* Block concurrent access. */
 2271 peter_e                   677 GIC         315 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                678                 : 
                                679             315 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
 2271 peter_e                   680 CBC         315 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
                                681                 : 
 2174 peter_e                   682 GIC         315 :     if (!MyLogicalRepWorker->in_use)
 2174 peter_e                   683 ECB             :     {
 2174 peter_e                   684 UIC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                685               0 :         ereport(ERROR,
 2153 bruce                     686 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                687                 :                  errmsg("logical replication worker slot %d is empty, cannot attach",
                                688                 :                         slot)));
                                689                 :     }
 2174 peter_e                   690                 : 
 2271 peter_e                   691 GIC         315 :     if (MyLogicalRepWorker->proc)
 2174 peter_e                   692 ECB             :     {
 2174 peter_e                   693 LBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                   694 UIC           0 :         ereport(ERROR,
                                695                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 2118 tgl                       696 ECB             :                  errmsg("logical replication worker slot %d is already used by "
                                697                 :                         "another worker, cannot attach", slot)));
                                698                 :     }
                                699                 : 
 2271 peter_e                   700 GIC         315 :     MyLogicalRepWorker->proc = MyProc;
                                701             315 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
                                702                 : 
                                703             315 :     LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                   704 CBC         315 : }
                                705                 : 
                                706                 : /*
                                707                 :  * Stop the parallel apply workers if any, and detach the leader apply worker
                                708                 :  * (cleans up the worker info).
 2271 peter_e                   709 ECB             :  */
                                710                 : static void
 2271 peter_e                   711 CBC         315 : logicalrep_worker_detach(void)
                                712                 : {
                                713                 :     /* Stop the parallel apply workers. */
   90 akapila                   714 GNC         315 :     if (am_leader_apply_worker())
                                715                 :     {
                                716                 :         List       *workers;
                                717                 :         ListCell   *lc;
                                718                 : 
                                719                 :         /*
                                720                 :          * Detach from the error_mq_handle for all parallel apply workers
                                721                 :          * before terminating them. This prevents the leader apply worker from
                                722                 :          * receiving the worker termination message and sending it to logs
                                723                 :          * when the same is already done by the parallel worker.
                                724                 :          */
                                725             148 :         pa_detach_all_error_mq();
                                726                 : 
                                727             148 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                728                 : 
                                729             148 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
                                730             300 :         foreach(lc, workers)
                                731                 :         {
                                732             152 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
                                733                 : 
                                734             152 :             if (isParallelApplyWorker(w))
                                735               3 :                 logicalrep_worker_stop_internal(w, SIGTERM);
                                736                 :         }
                                737                 : 
                                738             148 :         LWLockRelease(LogicalRepWorkerLock);
                                739                 :     }
                                740                 : 
 2271 peter_e                   741 ECB             :     /* Block concurrent access. */
 2271 peter_e                   742 GIC         315 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 2271 peter_e                   743 ECB             : 
 2174 peter_e                   744 CBC         315 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
                                745                 : 
 2271 peter_e                   746 GIC         315 :     LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                   747 CBC         315 : }
 2271 peter_e                   748 ECB             : 
                                749                 : /*
                                750                 :  * Clean up worker info.
                                751                 :  */
                                752                 : static void
 2174 peter_e                   753 GIC         315 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
                                754                 : {
                                755             315 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
                                756                 : 
                                757             315 :     worker->in_use = false;
 2174 peter_e                   758 CBC         315 :     worker->proc = NULL;
 2174 peter_e                   759 GIC         315 :     worker->dbid = InvalidOid;
                                760             315 :     worker->userid = InvalidOid;
                                761             315 :     worker->subid = InvalidOid;
 2174 peter_e                   762 CBC         315 :     worker->relid = InvalidOid;
   81 akapila                   763 GNC         315 :     worker->leader_pid = InvalidPid;
   90                           764             315 :     worker->parallel_apply = false;
 2174 peter_e                   765 GIC         315 : }
 2174 peter_e                   766 ECB             : 
                                767                 : /*
 2181 fujii                     768                 :  * Cleanup function for logical replication launcher.
                                769                 :  *
                                770                 :  * Called on logical replication launcher exit.
                                771                 :  */
                                772                 : static void
 2181 fujii                     773 GIC         322 : logicalrep_launcher_onexit(int code, Datum arg)
 2181 fujii                     774 ECB             : {
 2181 fujii                     775 CBC         322 :     LogicalRepCtx->launcher_pid = 0;
 2181 fujii                     776 GIC         322 : }
 2181 fujii                     777 ECB             : 
 2271 peter_e                   778                 : /*
                                779                 :  * Cleanup function.
                                780                 :  *
                                781                 :  * Called on logical replication worker exit.
                                782                 :  */
                                783                 : static void
 2271 peter_e                   784 CBC         315 : logicalrep_worker_onexit(int code, Datum arg)
                                785                 : {
                                786                 :     /* Disconnect gracefully from the remote side. */
  697 alvherre                  787 GIC         315 :     if (LogRepWorkerWalRcvConn)
  697 alvherre                  788 CBC         289 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
                                789                 : 
 2271 peter_e                   790             315 :     logicalrep_worker_detach();
                                791                 : 
  584 akapila                   792 ECB             :     /* Cleanup fileset used for streaming transactions. */
  584 akapila                   793 CBC         315 :     if (MyLogicalRepWorker->stream_fileset != NULL)
  584 akapila                   794 GIC          14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
  587 akapila                   795 ECB             : 
                                796                 :     /*
                                797                 :      * Session level locks may be acquired outside of a transaction in
                                798                 :      * parallel apply mode and will not be released when the worker
                                799                 :      * terminates, so manually release all locks before the worker exits.
                                800                 :      */
   90 akapila                   801 GNC         315 :     LockReleaseAll(DEFAULT_LOCKMETHOD, true);
                                802                 : 
 2138 peter_e                   803 CBC         315 :     ApplyLauncherWakeup();
 2271 peter_e                   804 GIC         315 : }
                                805                 : 
                                806                 : /*
                                807                 :  * Count the number of registered (not necessarily running) sync workers
                                808                 :  * for a subscription.
                                809                 :  */
                                810                 : int
 2208 peter_e                   811 CBC         977 : logicalrep_sync_worker_count(Oid subid)
                                812                 : {
 2153 bruce                     813 ECB             :     int         i;
 2153 bruce                     814 GIC         977 :     int         res = 0;
 2208 peter_e                   815 ECB             : 
 2208 peter_e                   816 CBC         977 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                817                 : 
                                818                 :     /* Search for attached worker for a given subscription id. */
 2208 peter_e                   819 GIC        5049 :     for (i = 0; i < max_logical_replication_workers; i++)
                                820                 :     {
 2153 bruce                     821            4072 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 2153 bruce                     822 ECB             : 
 2208 peter_e                   823 GIC        4072 :         if (w->subid == subid && OidIsValid(w->relid))
                                824            1208 :             res++;
 2208 peter_e                   825 ECB             :     }
                                826                 : 
 2208 peter_e                   827 CBC         977 :     return res;
 2208 peter_e                   828 ECB             : }
                                829                 : 
                                830                 : /*
                                831                 :  * Count the number of registered (but not necessarily running) parallel apply
                                832                 :  * workers for a subscription.
                                833                 :  */
                                834                 : static int
   90 akapila                   835 GNC         274 : logicalrep_pa_worker_count(Oid subid)
                                836                 : {
                                837                 :     int         i;
                                838             274 :     int         res = 0;
                                839                 : 
                                840             274 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                841                 : 
                                842                 :     /*
                                843                 :      * Scan all attached parallel apply workers, only counting those which
                                844                 :      * have the given subscription id.
                                845                 :      */
                                846            1458 :     for (i = 0; i < max_logical_replication_workers; i++)
                                847                 :     {
                                848            1184 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                849                 : 
                                850            1184 :         if (w->subid == subid && isParallelApplyWorker(w))
                                851               2 :             res++;
                                852                 :     }
                                853                 : 
                                854             274 :     return res;
                                855                 : }
                                856                 : 
 2271 peter_e                   857 ECB             : /*
                                858                 :  * ApplyLauncherShmemSize
 2271 peter_e                   859 EUB             :  *      Compute space needed for replication launcher shared memory
                                860                 :  */
                                861                 : Size
 2271 peter_e                   862 GIC        6390 : ApplyLauncherShmemSize(void)
                                863                 : {
                                864                 :     Size        size;
                                865                 : 
 2271 peter_e                   866 ECB             :     /*
                                867                 :      * Need the fixed struct and the array of LogicalRepWorker.
 2271 peter_e                   868 EUB             :      */
 2271 peter_e                   869 GBC        6390 :     size = sizeof(LogicalRepCtxStruct);
 2271 peter_e                   870 GIC        6390 :     size = MAXALIGN(size);
                                871            6390 :     size = add_size(size, mul_size(max_logical_replication_workers,
                                872                 :                                    sizeof(LogicalRepWorker)));
                                873            6390 :     return size;
                                874                 : }
 2271 peter_e                   875 ECB             : 
 2184 tgl                       876                 : /*
                                877                 :  * ApplyLauncherRegister
                                878                 :  *      Register a background worker running the logical replication launcher.
                                879                 :  */
                                880                 : void
 2271 peter_e                   881 GIC         598 : ApplyLauncherRegister(void)
                                882                 : {
                                883                 :     BackgroundWorker bgw;
                                884                 : 
                                885             598 :     if (max_logical_replication_workers == 0)
 2271 peter_e                   886 LBC           0 :         return;
                                887                 : 
 2184 tgl                       888 GIC         598 :     memset(&bgw, 0, sizeof(bgw));
 2153 bruce                     889 CBC         598 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                890                 :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 2271 peter_e                   891 GIC         598 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 2200 rhaas                     892             598 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
                                893             598 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 2271 peter_e                   894             598 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
                                895                 :              "logical replication launcher");
 2047                           896             598 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
                                897                 :              "logical replication launcher");
 2271                           898             598 :     bgw.bgw_restart_time = 5;
                                899             598 :     bgw.bgw_notify_pid = 0;
 2271 peter_e                   900 CBC         598 :     bgw.bgw_main_arg = (Datum) 0;
                                901                 : 
                                902             598 :     RegisterBackgroundWorker(&bgw);
                                903                 : }
 2271 peter_e                   904 ECB             : 
                                905                 : /*
                                906                 :  * ApplyLauncherShmemInit
                                907                 :  *      Allocate and initialize replication launcher shared memory
                                908                 :  */
                                909                 : void
 2271 peter_e                   910 CBC        1826 : ApplyLauncherShmemInit(void)
                                911                 : {
                                912                 :     bool        found;
 2271 peter_e                   913 ECB             : 
 2271 peter_e                   914 GIC        1826 :     LogicalRepCtx = (LogicalRepCtxStruct *)
                                915            1826 :         ShmemInitStruct("Logical Replication Launcher Data",
                                916                 :                         ApplyLauncherShmemSize(),
 2271 peter_e                   917 ECB             :                         &found);
                                918                 : 
 2271 peter_e                   919 CBC        1826 :     if (!found)
                                920                 :     {
 2153 bruce                     921 ECB             :         int         slot;
 2208 peter_e                   922                 : 
 2271 peter_e                   923 GIC        1826 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
                                924                 : 
   74 tgl                       925 GNC        1826 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
                                926            1826 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
                                927                 : 
                                928                 :         /* Initialize memory and spin locks for each worker slot. */
 2208 peter_e                   929 GIC        9134 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
                                930                 :         {
 2208 peter_e                   931 CBC        7308 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
                                932                 : 
                                933            7308 :             memset(worker, 0, sizeof(LogicalRepWorker));
 2208 peter_e                   934 GIC        7308 :             SpinLockInit(&worker->relmutex);
 2208 peter_e                   935 ECB             :         }
                                936                 :     }
 2271 peter_e                   937 CBC        1826 : }
 2271 peter_e                   938 ECB             : 
                                939                 : /*
                                940                 :  * Initialize or attach to the dynamic shared hash table that stores the
                                941                 :  * last-start times, if not already done.
                                942                 :  * This must be called before accessing the table.
                                943                 :  */
                                944                 : static void
   77 tgl                       945 GNC         358 : logicalrep_launcher_attach_dshmem(void)
                                946                 : {
                                947                 :     MemoryContext oldcontext;
                                948                 : 
                                949                 :     /* Quick exit if we already did this. */
   74                           950             358 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
   77                           951             320 :         last_start_times != NULL)
                                952             241 :         return;
                                953                 : 
                                954                 :     /* Otherwise, use a lock to ensure only one process creates the table. */
                                955             117 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                956                 : 
                                957                 :     /* Be sure any local memory allocated by DSA routines is persistent. */
                                958             117 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                                959                 : 
   74                           960             117 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
                                961                 :     {
                                962                 :         /* Initialize dynamic shared hash table for last-start times. */
   77                           963              38 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
                                964              38 :         dsa_pin(last_start_times_dsa);
                                965              38 :         dsa_pin_mapping(last_start_times_dsa);
                                966              38 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
                                967                 : 
                                968                 :         /* Store handles in shared memory for other backends to use. */
                                969              38 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
                                970              38 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
                                971                 :     }
                                972              79 :     else if (!last_start_times)
                                973                 :     {
                                974                 :         /* Attach to existing dynamic shared hash table. */
                                975              79 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
                                976              79 :         dsa_pin_mapping(last_start_times_dsa);
                                977              79 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
                                978              79 :                                          LogicalRepCtx->last_start_dsh, 0);
                                979                 :     }
                                980                 : 
                                981             117 :     MemoryContextSwitchTo(oldcontext);
                                982             117 :     LWLockRelease(LogicalRepWorkerLock);
                                983                 : }
                                984                 : 
                                985                 : /*
                                986                 :  * Set the last-start time for the subscription.
                                987                 :  */
                                988                 : static void
                                989             111 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
                                990                 : {
                                991                 :     LauncherLastStartTimesEntry *entry;
                                992                 :     bool        found;
                                993                 : 
                                994             111 :     logicalrep_launcher_attach_dshmem();
                                995                 : 
                                996             111 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
                                997             111 :     entry->last_start_time = start_time;
                                998             111 :     dshash_release_lock(last_start_times, entry);
                                999             111 : }
                               1000                 : 
                               1001                 : /*
                               1002                 :  * Return the last-start time for the subscription, or 0 if there isn't one.
                               1003                 :  */
                               1004                 : static TimestampTz
                               1005             139 : ApplyLauncherGetWorkerStartTime(Oid subid)
                               1006                 : {
                               1007                 :     LauncherLastStartTimesEntry *entry;
                               1008                 :     TimestampTz ret;
                               1009                 : 
                               1010             139 :     logicalrep_launcher_attach_dshmem();
                               1011                 : 
                               1012             139 :     entry = dshash_find(last_start_times, &subid, false);
                               1013             139 :     if (entry == NULL)
                               1014              93 :         return 0;
                               1015                 : 
                               1016              46 :     ret = entry->last_start_time;
                               1017              46 :     dshash_release_lock(last_start_times, entry);
                               1018                 : 
                               1019              46 :     return ret;
                               1020                 : }
                               1021                 : 
                               1022                 : /*
                               1023                 :  * Remove the last-start-time entry for the subscription, if one exists.
                               1024                 :  *
                               1025                 :  * This has two use-cases: to remove the entry related to a subscription
                               1026                 :  * that's been deleted or disabled (just to avoid leaking shared memory),
                               1027                 :  * and to allow immediate restart of an apply worker that has exited
                               1028                 :  * due to subscription parameter changes.
                               1029                 :  */
                               1030                 : void
                               1031             108 : ApplyLauncherForgetWorkerStartTime(Oid subid)
                               1032                 : {
                               1033             108 :     logicalrep_launcher_attach_dshmem();
                               1034                 : 
                               1035             108 :     (void) dshash_delete_key(last_start_times, &subid);
                               1036             108 : }
                               1037                 : 
 2271 peter_e                  1038 ECB             : /*
                               1039                 :  * Wakeup the launcher on commit if requested.
                               1040                 :  */
                               1041                 : void
 2169 peter_e                  1042 CBC      485915 : AtEOXact_ApplyLauncher(bool isCommit)
                               1043                 : {
 2074 peter_e                  1044 GIC      485915 :     if (isCommit)
                               1045                 :     {
                               1046          465429 :         if (on_commit_launcher_wakeup)
                               1047              83 :             ApplyLauncherWakeup();
                               1048                 :     }
                               1049                 : 
 2169 peter_e                  1050 CBC      485915 :     on_commit_launcher_wakeup = false;
 2271 peter_e                  1051 GIC      485915 : }
 2271 peter_e                  1052 ECB             : 
                               1053                 : /*
                               1054                 :  * Request wakeup of the launcher on commit of the transaction.
                               1055                 :  *
                               1056                 :  * This is used to send launcher signal to stop sleeping and process the
                               1057                 :  * subscriptions when current transaction commits. Should be used when new
                               1058                 :  * tuple was added to the pg_subscription catalog.
                               1059                 : */
                               1060                 : void
 2271 peter_e                  1061 CBC          83 : ApplyLauncherWakeupAtCommit(void)
                               1062                 : {
 2253 heikki.linnakangas       1063 GIC          83 :     if (!on_commit_launcher_wakeup)
 2253 heikki.linnakangas       1064 CBC          83 :         on_commit_launcher_wakeup = true;
 2271 peter_e                  1065              83 : }
                               1066                 : 
 2181 fujii                    1067 ECB             : static void
 2271 peter_e                  1068 GIC         398 : ApplyLauncherWakeup(void)
                               1069                 : {
 2181 fujii                    1070 CBC         398 :     if (LogicalRepCtx->launcher_pid != 0)
 2271 peter_e                  1071             381 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 2271 peter_e                  1072 GIC         398 : }
                               1073                 : 
                               1074                 : /*
                               1075                 :  * Main loop for the apply launcher process.
                               1076                 :  */
                               1077                 : void
 2271 peter_e                  1078 CBC         322 : ApplyLauncherMain(Datum main_arg)
                               1079                 : {
 2221 tgl                      1080 GIC         322 :     ereport(DEBUG1,
                               1081                 :             (errmsg_internal("logical replication launcher started")));
                               1082                 : 
 2181 fujii                    1083             322 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
                               1084                 : 
 2131 andres                   1085             322 :     Assert(LogicalRepCtx->launcher_pid == 0);
 2131 andres                   1086 CBC         322 :     LogicalRepCtx->launcher_pid = MyProcPid;
                               1087                 : 
                               1088                 :     /* Establish signal handlers. */
  996 akapila                  1089             322 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
 2131 andres                   1090 GIC         322 :     pqsignal(SIGTERM, die);
 2271 peter_e                  1091 CBC         322 :     BackgroundWorkerUnblockSignals();
                               1092                 : 
                               1093                 :     /*
 2271 peter_e                  1094 ECB             :      * Establish connection to nailed catalogs (we only ever access
                               1095                 :      * pg_subscription).
                               1096                 :      */
 1830 magnus                   1097 GIC         322 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 2271 peter_e                  1098 ECB             : 
                               1099                 :     /* Enter main loop */
                               1100                 :     for (;;)
 2271 peter_e                  1101 GIC        1715 :     {
 2271 peter_e                  1102 ECB             :         int         rc;
                               1103                 :         List       *sublist;
                               1104                 :         ListCell   *lc;
                               1105                 :         MemoryContext subctx;
                               1106                 :         MemoryContext oldctx;
 2153 bruce                    1107 GIC        2037 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
                               1108                 : 
 2131 andres                   1109 CBC        2037 :         CHECK_FOR_INTERRUPTS();
                               1110                 : 
                               1111                 :         /* Use temporary context to avoid leaking memory across cycles. */
   77 tgl                      1112 GNC        2036 :         subctx = AllocSetContextCreate(TopMemoryContext,
                               1113                 :                                        "Logical Replication Launcher sublist",
                               1114                 :                                        ALLOCSET_DEFAULT_SIZES);
                               1115            2036 :         oldctx = MemoryContextSwitchTo(subctx);
 2271 peter_e                  1116 ECB             : 
                               1117                 :         /* Start any missing workers for enabled subscriptions. */
   77 tgl                      1118 GNC        2036 :         sublist = get_subscription_list();
                               1119            2412 :         foreach(lc, sublist)
                               1120                 :         {
                               1121             376 :             Subscription *sub = (Subscription *) lfirst(lc);
                               1122                 :             LogicalRepWorker *w;
                               1123                 :             TimestampTz last_start;
                               1124                 :             TimestampTz now;
                               1125                 :             long        elapsed;
 2271 peter_e                  1126 ECB             : 
   77 tgl                      1127 GNC         376 :             if (!sub->enabled)
                               1128              22 :                 continue;
 2063 peter_e                  1129 ECB             : 
   77 tgl                      1130 GNC         354 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1131             354 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
                               1132             354 :             LWLockRelease(LogicalRepWorkerLock);
                               1133                 : 
                               1134             354 :             if (w != NULL)
                               1135             215 :                 continue;       /* worker is running already */
                               1136                 : 
                               1137                 :             /*
                               1138                 :              * If the worker is eligible to start now, launch it.  Otherwise,
                               1139                 :              * adjust wait_time so that we'll wake up as soon as it can be
                               1140                 :              * started.
                               1141                 :              *
                               1142                 :              * Each subscription's apply worker can only be restarted once per
                               1143                 :              * wal_retrieve_retry_interval, so that errors do not cause us to
                               1144                 :              * repeatedly restart the worker as fast as possible.  In cases
                               1145                 :              * where a restart is expected (e.g., subscription parameter
                               1146                 :              * changes), another process should remove the last-start entry
                               1147                 :              * for the subscription so that the worker can be restarted
                               1148                 :              * without waiting for wal_retrieve_retry_interval to elapse.
                               1149                 :              */
                               1150             139 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
                               1151             139 :             now = GetCurrentTimestamp();
                               1152             139 :             if (last_start == 0 ||
                               1153              46 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
                               1154                 :             {
                               1155             111 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
                               1156             111 :                 logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
                               1157                 :                                          sub->owner, InvalidOid,
                               1158                 :                                          DSM_HANDLE_INVALID);
                               1159                 :             }
                               1160                 :             else
                               1161                 :             {
                               1162              28 :                 wait_time = Min(wait_time,
                               1163                 :                                 wal_retrieve_retry_interval - elapsed);
                               1164                 :             }
 2271 peter_e                  1165 ECB             :         }
                               1166                 : 
                               1167                 :         /* Switch back to original memory context. */
   77 tgl                      1168 GNC        2036 :         MemoryContextSwitchTo(oldctx);
                               1169                 :         /* Clean the temporary memory. */
                               1170            2036 :         MemoryContextDelete(subctx);
                               1171                 : 
 2271 peter_e                  1172 ECB             :         /* Wait for more work. */
 2133 andres                   1173 CBC        2036 :         rc = WaitLatch(MyLatch,
 1598 tmunro                   1174 ECB             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 2271 peter_e                  1175                 :                        wait_time,
                               1176                 :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
                               1177                 : 
 2133 andres                   1178 GIC        2033 :         if (rc & WL_LATCH_SET)
 2133 andres                   1179 ECB             :         {
 2133 andres                   1180 CBC        2023 :             ResetLatch(MyLatch);
                               1181            2023 :             CHECK_FOR_INTERRUPTS();
                               1182                 :         }
 2133 andres                   1183 ECB             : 
 1209 rhaas                    1184 GIC        1715 :         if (ConfigReloadPending)
                               1185                 :         {
                               1186              25 :             ConfigReloadPending = false;
 2190 peter_e                  1187              25 :             ProcessConfigFile(PGC_SIGHUP);
                               1188                 :         }
                               1189                 :     }
                               1190                 : 
 2131 andres                   1191 ECB             :     /* Not reachable */
                               1192                 : }
                               1193                 : 
                               1194                 : /*
                               1195                 :  * Is current process the logical replication launcher?
                               1196                 :  */
                               1197                 : bool
 2131 andres                   1198 GIC         336 : IsLogicalLauncher(void)
                               1199                 : {
 2131 andres                   1200 CBC         336 :     return LogicalRepCtx->launcher_pid == MyProcPid;
                               1201                 : }
                               1202                 : 
                               1203                 : /*
                               1204                 :  * Return the pid of the leader apply worker if the given pid is the pid of a
                               1205                 :  * parallel apply worker, otherwise, return InvalidPid.
                               1206                 :  */
                               1207                 : pid_t
   81 akapila                  1208 GNC         580 : GetLeaderApplyWorkerPid(pid_t pid)
                               1209                 : {
                               1210             580 :     int         leader_pid = InvalidPid;
                               1211                 :     int         i;
                               1212                 : 
                               1213             580 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1214                 : 
                               1215            2900 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1216                 :     {
                               1217            2320 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                               1218                 : 
                               1219            2320 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
                               1220                 :         {
   81 akapila                  1221 UNC           0 :             leader_pid = w->leader_pid;
                               1222               0 :             break;
                               1223                 :         }
                               1224                 :     }
                               1225                 : 
   81 akapila                  1226 GNC         580 :     LWLockRelease(LogicalRepWorkerLock);
                               1227                 : 
                               1228             580 :     return leader_pid;
                               1229                 : }
                               1230                 : 
                               1231                 : /*
 2271 peter_e                  1232 ECB             :  * Returns state of the subscriptions.
                               1233                 :  */
                               1234                 : Datum
 2271 peter_e                  1235 CBC           1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
                               1236                 : {
                               1237                 : #define PG_STAT_GET_SUBSCRIPTION_COLS   9
                               1238               1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
                               1239                 :     int         i;
                               1240               1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1241                 : 
  173 michael                  1242               1 :     InitMaterializedSRF(fcinfo, 0);
 2271 peter_e                  1243 ECB             : 
                               1244                 :     /* Make sure we get consistent view of the workers. */
 2271 peter_e                  1245 GIC           1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 2271 peter_e                  1246 ECB             : 
  306 tgl                      1247 GIC           5 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1248                 :     {
                               1249                 :         /* for each row */
  267 peter                    1250 GNC           4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1251               4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1252                 :         int         worker_pid;
                               1253                 :         LogicalRepWorker worker;
 2271 peter_e                  1254 ECB             : 
 2271 peter_e                  1255 GIC           4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
                               1256                 :                sizeof(LogicalRepWorker));
                               1257               4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
                               1258               2 :             continue;
 2271 peter_e                  1259 ECB             : 
 2271 peter_e                  1260 CBC           2 :         if (OidIsValid(subid) && worker.subid != subid)
 2271 peter_e                  1261 LBC           0 :             continue;
                               1262                 : 
 2271 peter_e                  1263 GIC           2 :         worker_pid = worker.proc->pid;
 2271 peter_e                  1264 ECB             : 
 2271 peter_e                  1265 GIC           2 :         values[0] = ObjectIdGetDatum(worker.subid);
 2208 peter_e                  1266 CBC           2 :         if (OidIsValid(worker.relid))
 2208 peter_e                  1267 UIC           0 :             values[1] = ObjectIdGetDatum(worker.relid);
                               1268                 :         else
 2208 peter_e                  1269 CBC           2 :             nulls[1] = true;
                               1270               2 :         values[2] = Int32GetDatum(worker_pid);
                               1271                 : 
   81 akapila                  1272 GNC           2 :         if (isParallelApplyWorker(&worker))
   81 akapila                  1273 UNC           0 :             values[3] = Int32GetDatum(worker.leader_pid);
                               1274                 :         else
 2208 peter_e                  1275 GNC           2 :             nulls[3] = true;
                               1276                 : 
   81 akapila                  1277               2 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
   81 akapila                  1278 CBC           1 :             nulls[4] = true;
                               1279                 :         else
   81 akapila                  1280 GNC           1 :             values[4] = LSNGetDatum(worker.last_lsn);
 2271 peter_e                  1281               2 :         if (worker.last_send_time == 0)
   81 akapila                  1282 UIC           0 :             nulls[5] = true;
 2271 peter_e                  1283 ECB             :         else
   81 akapila                  1284 GNC           2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
 2271 peter_e                  1285               2 :         if (worker.last_recv_time == 0)
   81 akapila                  1286 LBC           0 :             nulls[6] = true;
                               1287                 :         else
   81 akapila                  1288 GNC           2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
 2271 peter_e                  1289               2 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
   81 akapila                  1290 CBC           1 :             nulls[7] = true;
                               1291                 :         else
   81 akapila                  1292 GNC           1 :             values[7] = LSNGetDatum(worker.reply_lsn);
 2271 peter_e                  1293               2 :         if (worker.reply_time == 0)
   81 akapila                  1294 UNC           0 :             nulls[8] = true;
                               1295                 :         else
   81 akapila                  1296 GNC           2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
                               1297                 : 
  398 michael                  1298 GIC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                               1299                 :                              values, nulls);
                               1300                 : 
 2153 bruce                    1301 ECB             :         /*
                               1302                 :          * If only a single subscription was requested, and we found it,
                               1303                 :          * break.
                               1304                 :          */
 2271 peter_e                  1305 GIC           2 :         if (OidIsValid(subid))
 2271 peter_e                  1306 LBC           0 :             break;
                               1307                 :     }
 2271 peter_e                  1308 ECB             : 
 2271 peter_e                  1309 CBC           1 :     LWLockRelease(LogicalRepWorkerLock);
 2271 peter_e                  1310 ECB             : 
 2271 peter_e                  1311 CBC           1 :     return (Datum) 0;
                               1312                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a