LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC DCB
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 87.6 % 477 418 10 1 48 1 39 378 25
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 31 31 8 23 1
Baseline: 16@8cea358b128 Branches: 64.0 % 308 197 17 4 6 84 4 6 27 160
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed [..60] days: 100.0 % 1 1 1
(60,120] days: 100.0 % 1 1 1
(180,240] days: 70.6 % 34 24 10 24
(240..) days: 88.9 % 441 392 1 48 1 13 378
Function coverage date bins:
(240..) days: 100.0 % 31 31 8 23
Branch coverage date bins:
(60,120] days: 100.0 % 4 4 4
(180,240] days: 57.7 % 26 15 11 15
(240..) days: 64.0 % 278 178 6 4 6 84 4 6 8 160

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * launcher.c
                                  3                 :                :  *     PostgreSQL logical replication worker launcher process
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/launcher.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This module contains the logical replication worker launcher which
                                 12                 :                :  *    uses the background worker infrastructure to start the logical
                                 13                 :                :  *    replication workers for every enabled subscription.
                                 14                 :                :  *
                                 15                 :                :  *-------------------------------------------------------------------------
                                 16                 :                :  */
                                 17                 :                : 
                                 18                 :                : #include "postgres.h"
                                 19                 :                : 
                                 20                 :                : #include "access/heapam.h"
                                 21                 :                : #include "access/htup.h"
                                 22                 :                : #include "access/htup_details.h"
                                 23                 :                : #include "access/tableam.h"
                                 24                 :                : #include "access/xact.h"
                                 25                 :                : #include "catalog/pg_subscription.h"
                                 26                 :                : #include "catalog/pg_subscription_rel.h"
                                 27                 :                : #include "funcapi.h"
                                 28                 :                : #include "lib/dshash.h"
                                 29                 :                : #include "miscadmin.h"
                                 30                 :                : #include "pgstat.h"
                                 31                 :                : #include "postmaster/bgworker.h"
                                 32                 :                : #include "postmaster/interrupt.h"
                                 33                 :                : #include "replication/logicallauncher.h"
                                 34                 :                : #include "replication/slot.h"
                                 35                 :                : #include "replication/walreceiver.h"
                                 36                 :                : #include "replication/worker_internal.h"
                                 37                 :                : #include "storage/ipc.h"
                                 38                 :                : #include "storage/proc.h"
                                 39                 :                : #include "storage/procarray.h"
                                 40                 :                : #include "tcop/tcopprot.h"
                                 41                 :                : #include "utils/builtins.h"
                                 42                 :                : #include "utils/memutils.h"
                                 43                 :                : #include "utils/pg_lsn.h"
                                 44                 :                : #include "utils/snapmgr.h"
                                 45                 :                : 
                                 46                 :                : /* max sleep time between cycles (3min) */
                                 47                 :                : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
                                 48                 :                : 
                                 49                 :                : /* GUC variables */
                                 50                 :                : int         max_logical_replication_workers = 4;
                                 51                 :                : int         max_sync_workers_per_subscription = 2;
                                 52                 :                : int         max_parallel_apply_workers_per_subscription = 2;
                                 53                 :                : 
                                 54                 :                : LogicalRepWorker *MyLogicalRepWorker = NULL;
                                 55                 :                : 
                                 56                 :                : typedef struct LogicalRepCtxStruct
                                 57                 :                : {
                                 58                 :                :     /* Supervisor process. */
                                 59                 :                :     pid_t       launcher_pid;
                                 60                 :                : 
                                 61                 :                :     /* Hash table holding last start times of subscriptions' apply workers. */
                                 62                 :                :     dsa_handle  last_start_dsa;
                                 63                 :                :     dshash_table_handle last_start_dsh;
                                 64                 :                : 
                                 65                 :                :     /* Background workers. */
                                 66                 :                :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
                                 67                 :                : } LogicalRepCtxStruct;
                                 68                 :                : 
                                 69                 :                : static LogicalRepCtxStruct *LogicalRepCtx;
                                 70                 :                : 
                                 71                 :                : /* an entry in the last-start-times shared hash table */
                                 72                 :                : typedef struct LauncherLastStartTimesEntry
                                 73                 :                : {
                                 74                 :                :     Oid         subid;          /* OID of logrep subscription (hash key) */
                                 75                 :                :     TimestampTz last_start_time;    /* last time its apply worker was started */
                                 76                 :                : } LauncherLastStartTimesEntry;
                                 77                 :                : 
                                 78                 :                : /* parameters for the last-start-times shared hash table */
                                 79                 :                : static const dshash_parameters dsh_params = {
                                 80                 :                :     sizeof(Oid),
                                 81                 :                :     sizeof(LauncherLastStartTimesEntry),
                                 82                 :                :     dshash_memcmp,
                                 83                 :                :     dshash_memhash,
                                 84                 :                :     dshash_memcpy,
                                 85                 :                :     LWTRANCHE_LAUNCHER_HASH
                                 86                 :                : };
                                 87                 :                : 
                                 88                 :                : static dsa_area *last_start_times_dsa = NULL;
                                 89                 :                : static dshash_table *last_start_times = NULL;
                                 90                 :                : 
                                 91                 :                : static bool on_commit_launcher_wakeup = false;
                                 92                 :                : 
                                 93                 :                : 
                                 94                 :                : static void ApplyLauncherWakeup(void);
                                 95                 :                : static void logicalrep_launcher_onexit(int code, Datum arg);
                                 96                 :                : static void logicalrep_worker_onexit(int code, Datum arg);
                                 97                 :                : static void logicalrep_worker_detach(void);
                                 98                 :                : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
                                 99                 :                : static int  logicalrep_pa_worker_count(Oid subid);
                                100                 :                : static void logicalrep_launcher_attach_dshmem(void);
                                101                 :                : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
                                102                 :                : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
                                103                 :                : 
                                104                 :                : 
                                105                 :                : /*
                                106                 :                :  * Load the list of subscriptions.
                                107                 :                :  *
                                108                 :                :  * Only the fields interesting for worker start/stop functions are filled for
                                109                 :                :  * each subscription.
                                110                 :                :  */
                                111                 :                : static List *
 2642 peter_e@gmx.net           112                 :CBC        3124 : get_subscription_list(void)
                                113                 :                : {
                                114                 :           3124 :     List       *res = NIL;
                                115                 :                :     Relation    rel;
                                116                 :                :     TableScanDesc scan;
                                117                 :                :     HeapTuple   tup;
                                118                 :                :     MemoryContext resultcxt;
                                119                 :                : 
                                120                 :                :     /* This is the context that we will allocate our output data in */
                                121                 :           3124 :     resultcxt = CurrentMemoryContext;
                                122                 :                : 
                                123                 :                :     /*
                                124                 :                :      * Start a transaction so we can access pg_database, and get a snapshot.
                                125                 :                :      * We don't have a use for the snapshot itself, but we're interested in
                                126                 :                :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
                                127                 :                :      * for anything that reads heap pages, because HOT may decide to prune
                                128                 :                :      * them even if the process doesn't attempt to modify any tuples.)
                                129                 :                :      *
                                130                 :                :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
                                131                 :                :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
                                132                 :                :      * e.g. be cleared when cache invalidations are processed).
                                133                 :                :      */
                                134                 :           3124 :     StartTransactionCommand();
                                135                 :           3124 :     (void) GetTransactionSnapshot();
                                136                 :                : 
 1910 andres@anarazel.de        137                 :           3124 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
 1861                           138                 :           3124 :     scan = table_beginscan_catalog(rel, 0, NULL);
                                139                 :                : 
 2642 peter_e@gmx.net           140         [ +  + ]:           4139 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
                                141                 :                :     {
                                142                 :           1015 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
                                143                 :                :         Subscription *sub;
                                144                 :                :         MemoryContext oldcxt;
                                145                 :                : 
                                146                 :                :         /*
                                147                 :                :          * Allocate our results in the caller's context, not the
                                148                 :                :          * transaction's. We do this inside the loop, and restore the original
                                149                 :                :          * context at the end, so that leaky things like heap_getnext() are
                                150                 :                :          * not called in a potentially long-lived context.
                                151                 :                :          */
                                152                 :           1015 :         oldcxt = MemoryContextSwitchTo(resultcxt);
                                153                 :                : 
 2557                           154                 :           1015 :         sub = (Subscription *) palloc0(sizeof(Subscription));
 1972 andres@anarazel.de        155                 :           1015 :         sub->oid = subform->oid;
 2642 peter_e@gmx.net           156                 :           1015 :         sub->dbid = subform->subdbid;
                                157                 :           1015 :         sub->owner = subform->subowner;
                                158                 :           1015 :         sub->enabled = subform->subenabled;
                                159                 :           1015 :         sub->name = pstrdup(NameStr(subform->subname));
                                160                 :                :         /* We don't fill fields we are not interested in. */
                                161                 :                : 
                                162                 :           1015 :         res = lappend(res, sub);
                                163                 :           1015 :         MemoryContextSwitchTo(oldcxt);
                                164                 :                :     }
                                165                 :                : 
 1861 andres@anarazel.de        166                 :           3124 :     table_endscan(scan);
 1910                           167                 :           3124 :     table_close(rel, AccessShareLock);
                                168                 :                : 
 2642 peter_e@gmx.net           169                 :           3124 :     CommitTransactionCommand();
                                170                 :                : 
                                171                 :           3124 :     return res;
                                172                 :                : }
                                173                 :                : 
                                174                 :                : /*
                                175                 :                :  * Wait for a background worker to start up and attach to the shmem context.
                                176                 :                :  *
                                177                 :                :  * This is only needed for cleaning up the shared memory in case the worker
                                178                 :                :  * fails to attach.
                                179                 :                :  *
                                180                 :                :  * Returns whether the attach was successful.
                                181                 :                :  */
                                182                 :                : static bool
                                183                 :            455 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                184                 :                :                                uint16 generation,
                                185                 :                :                                BackgroundWorkerHandle *handle)
                                186                 :                : {
                                187                 :                :     BgwHandleStatus status;
                                188                 :                :     int         rc;
                                189                 :                : 
                                190                 :                :     for (;;)
                                191                 :            956 :     {
                                192                 :                :         pid_t       pid;
                                193                 :                : 
                                194         [ -  + ]:           1411 :         CHECK_FOR_INTERRUPTS();
                                195                 :                : 
 2545                           196                 :           1411 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                197                 :                : 
                                198                 :                :         /* Worker either died or has started. Return false if died. */
                                199   [ +  +  +  + ]:           1411 :         if (!worker->in_use || worker->proc)
                                200                 :                :         {
                                201                 :            453 :             LWLockRelease(LogicalRepWorkerLock);
  461 akapila@postgresql.o      202                 :            453 :             return worker->in_use;
                                203                 :                :         }
                                204                 :                : 
 2545 peter_e@gmx.net           205                 :            958 :         LWLockRelease(LogicalRepWorkerLock);
                                206                 :                : 
                                207                 :                :         /* Check if worker has died before attaching, and clean up after it. */
 2642                           208                 :            958 :         status = GetBackgroundWorkerPid(handle, &pid);
                                209                 :                : 
                                210         [ -  + ]:            958 :         if (status == BGWH_STOPPED)
                                211                 :                :         {
 2545 peter_e@gmx.net           212                 :UBC           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                213                 :                :             /* Ensure that this was indeed the worker we waited for. */
                                214         [ #  # ]:              0 :             if (generation == worker->generation)
                                215                 :              0 :                 logicalrep_worker_cleanup(worker);
                                216                 :              0 :             LWLockRelease(LogicalRepWorkerLock);
  461 akapila@postgresql.o      217                 :              0 :             return false;
                                218                 :                :         }
                                219                 :                : 
                                220                 :                :         /*
                                221                 :                :          * We need timeout because we generally don't get notified via latch
                                222                 :                :          * about the worker attach.  But we don't expect to have to wait long.
                                223                 :                :          */
 2642 peter_e@gmx.net           224                 :CBC         958 :         rc = WaitLatch(MyLatch,
                                225                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                226                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                227                 :                : 
 2504 andres@anarazel.de        228         [ +  + ]:            956 :         if (rc & WL_LATCH_SET)
                                229                 :                :         {
                                230                 :            534 :             ResetLatch(MyLatch);
                                231         [ -  + ]:            534 :             CHECK_FOR_INTERRUPTS();
                                232                 :                :         }
                                233                 :                :     }
                                234                 :                : }
                                235                 :                : 
                                236                 :                : /*
                                237                 :                :  * Walks the workers array and searches for one that matches given
                                238                 :                :  * subscription id and relid.
                                239                 :                :  *
                                240                 :                :  * We are only interested in the leader apply worker or table sync worker.
                                241                 :                :  */
                                242                 :                : LogicalRepWorker *
 2579 peter_e@gmx.net           243                 :           2818 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
                                244                 :                : {
                                245                 :                :     int         i;
 2524 bruce@momjian.us          246                 :           2818 :     LogicalRepWorker *res = NULL;
                                247                 :                : 
 2642 peter_e@gmx.net           248         [ -  + ]:           2818 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                249                 :                : 
                                250                 :                :     /* Search for attached worker for a given subscription id. */
                                251         [ +  + ]:          10059 :     for (i = 0; i < max_logical_replication_workers; i++)
                                252                 :                :     {
 2524 bruce@momjian.us          253                 :           8719 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                254                 :                : 
                                255                 :                :         /* Skip parallel apply workers. */
  461 akapila@postgresql.o      256   [ +  +  -  + ]:           8719 :         if (isParallelApplyWorker(w))
  461 akapila@postgresql.o      257                 :LBC      (1434) :             continue;
                                258                 :                : 
 2545 peter_e@gmx.net           259   [ +  +  +  +  :CBC        8719 :         if (w->in_use && w->subid == subid && w->relid == relid &&
                                              +  + ]
                                260   [ +  +  +  - ]:           1478 :             (!only_running || w->proc))
                                261                 :                :         {
 2642                           262                 :           1478 :             res = w;
                                263                 :           1478 :             break;
                                264                 :                :         }
                                265                 :                :     }
                                266                 :                : 
                                267                 :           2818 :     return res;
                                268                 :                : }
                                269                 :                : 
                                270                 :                : /*
                                271                 :                :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
                                272                 :                :  * the subscription, instead of just one.
                                273                 :                :  */
                                274                 :                : List *
 2445                           275                 :            524 : logicalrep_workers_find(Oid subid, bool only_running)
                                276                 :                : {
                                277                 :                :     int         i;
                                278                 :            524 :     List       *res = NIL;
                                279                 :                : 
                                280         [ -  + ]:            524 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                281                 :                : 
                                282                 :                :     /* Search for attached worker for a given subscription id. */
                                283         [ +  + ]:           2820 :     for (i = 0; i < max_logical_replication_workers; i++)
                                284                 :                :     {
                                285                 :           2296 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                286                 :                : 
                                287   [ +  +  +  +  :           2296 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
                                        +  +  +  + ]
                                288                 :            372 :             res = lappend(res, w);
                                289                 :                :     }
                                290                 :                : 
                                291                 :            524 :     return res;
                                292                 :                : }
                                293                 :                : 
                                294                 :                : /*
                                295                 :                :  * Start new logical replication background worker, if possible.
                                296                 :                :  *
                                297                 :                :  * Returns true on success, false on failure.
                                298                 :                :  */
                                299                 :                : bool
  244 akapila@postgresql.o      300                 :GNC         459 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                301                 :                :                          Oid dbid, Oid subid, const char *subname, Oid userid,
                                302                 :                :                          Oid relid, dsm_handle subworker_dsm)
                                303                 :                : {
                                304                 :                :     BackgroundWorker bgw;
                                305                 :                :     BackgroundWorkerHandle *bgw_handle;
                                306                 :                :     uint16      generation;
                                307                 :                :     int         i;
 2524 bruce@momjian.us          308                 :CBC         459 :     int         slot = 0;
                                309                 :            459 :     LogicalRepWorker *worker = NULL;
                                310                 :                :     int         nsyncworkers;
                                311                 :                :     int         nparallelapplyworkers;
                                312                 :                :     TimestampTz now;
  244 akapila@postgresql.o      313                 :GNC         459 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
                                314                 :            459 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
                                315                 :                : 
                                316                 :                :     /*----------
                                317                 :                :      * Sanity checks:
                                318                 :                :      * - must be valid worker type
                                319                 :                :      * - tablesync workers are only ones to have relid
                                320                 :                :      * - parallel apply worker is the only kind of subworker
                                321                 :                :      */
                                322         [ -  + ]:            459 :     Assert(wtype != WORKERTYPE_UNKNOWN);
                                323         [ -  + ]:            459 :     Assert(is_tablesync_worker == OidIsValid(relid));
                                324         [ -  + ]:            459 :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
                                325                 :                : 
 2517 peter_e@gmx.net           326         [ +  + ]:CBC         459 :     ereport(DEBUG1,
                                327                 :                :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
                                328                 :                :                              subname)));
                                329                 :                : 
                                330                 :                :     /* Report this after the initial starting message for consistency. */
 2642                           331         [ -  + ]:            459 :     if (max_replication_slots == 0)
 2642 peter_e@gmx.net           332         [ #  # ]:UBC           0 :         ereport(ERROR,
                                333                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                334                 :                :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
                                335                 :                : 
                                336                 :                :     /*
                                337                 :                :      * We need to do the modification of the shared memory under lock so that
                                338                 :                :      * we have consistent view.
                                339                 :                :      */
 2642 peter_e@gmx.net           340                 :CBC         459 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                341                 :                : 
 2545                           342                 :            459 : retry:
                                343                 :                :     /* Find unused worker slot. */
                                344         [ +  - ]:            753 :     for (i = 0; i < max_logical_replication_workers; i++)
                                345                 :                :     {
                                346                 :            753 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                347                 :                : 
                                348         [ +  + ]:            753 :         if (!w->in_use)
                                349                 :                :         {
                                350                 :            459 :             worker = w;
                                351                 :            459 :             slot = i;
 2642                           352                 :            459 :             break;
                                353                 :                :         }
                                354                 :                :     }
                                355                 :                : 
 2545                           356                 :            459 :     nsyncworkers = logicalrep_sync_worker_count(subid);
                                357                 :                : 
                                358                 :            459 :     now = GetCurrentTimestamp();
                                359                 :                : 
                                360                 :                :     /*
                                361                 :                :      * If we didn't find a free slot, try to do garbage collection.  The
                                362                 :                :      * reason we do this is because if some worker failed to start up and its
                                363                 :                :      * parent has crashed while waiting, the in_use state was never cleared.
                                364                 :                :      */
                                365   [ +  -  -  + ]:            459 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
                                366                 :                :     {
 2524 bruce@momjian.us          367                 :UBC           0 :         bool        did_cleanup = false;
                                368                 :                : 
 2545 peter_e@gmx.net           369         [ #  # ]:              0 :         for (i = 0; i < max_logical_replication_workers; i++)
                                370                 :                :         {
                                371                 :              0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                372                 :                : 
                                373                 :                :             /*
                                374                 :                :              * If the worker was marked in use but didn't manage to attach in
                                375                 :                :              * time, clean it up.
                                376                 :                :              */
                                377   [ #  #  #  #  :              0 :             if (w->in_use && !w->proc &&
                                              #  # ]
                                378                 :              0 :                 TimestampDifferenceExceeds(w->launch_time, now,
                                379                 :                :                                            wal_receiver_timeout))
                                380                 :                :             {
                                381         [ #  # ]:              0 :                 elog(WARNING,
                                382                 :                :                      "logical replication worker for subscription %u took too long to start; canceled",
                                383                 :                :                      w->subid);
                                384                 :                : 
                                385                 :              0 :                 logicalrep_worker_cleanup(w);
                                386                 :              0 :                 did_cleanup = true;
                                387                 :                :             }
                                388                 :                :         }
                                389                 :                : 
                                390         [ #  # ]:              0 :         if (did_cleanup)
                                391                 :              0 :             goto retry;
                                392                 :                :     }
                                393                 :                : 
                                394                 :                :     /*
                                395                 :                :      * We don't allow to invoke more sync workers once we have reached the
                                396                 :                :      * sync worker limit per subscription. So, just return silently as we
                                397                 :                :      * might get here because of an otherwise harmless race condition.
                                398                 :                :      */
  244 akapila@postgresql.o      399   [ +  +  -  + ]:GNC         459 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
                                400                 :                :     {
 2545 peter_e@gmx.net           401                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
  461 akapila@postgresql.o      402                 :              0 :         return false;
                                403                 :                :     }
                                404                 :                : 
  461 akapila@postgresql.o      405                 :CBC         459 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
                                406                 :                : 
                                407                 :                :     /*
                                408                 :                :      * Return false if the number of parallel apply workers reached the limit
                                409                 :                :      * per subscription.
                                410                 :                :      */
                                411         [ +  + ]:            459 :     if (is_parallel_apply_worker &&
                                412         [ -  + ]:             11 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
                                413                 :                :     {
  461 akapila@postgresql.o      414                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                415                 :              0 :         return false;
                                416                 :                :     }
                                417                 :                : 
                                418                 :                :     /*
                                419                 :                :      * However if there are no more free worker slots, inform user about it
                                420                 :                :      * before exiting.
                                421                 :                :      */
 2642 peter_e@gmx.net           422         [ -  + ]:CBC         459 :     if (worker == NULL)
                                423                 :                :     {
 2637 fujii@postgresql.org      424                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 2642 peter_e@gmx.net           425         [ #  # ]:              0 :         ereport(WARNING,
                                426                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                427                 :                :                  errmsg("out of logical replication worker slots"),
                                428                 :                :                  errhint("You might need to increase %s.", "max_logical_replication_workers")));
  461 akapila@postgresql.o      429                 :              0 :         return false;
                                430                 :                :     }
                                431                 :                : 
                                432                 :                :     /* Prepare the worker slot. */
  244 akapila@postgresql.o      433                 :GNC         459 :     worker->type = wtype;
 2545 peter_e@gmx.net           434                 :CBC         459 :     worker->launch_time = now;
                                435                 :            459 :     worker->in_use = true;
                                436                 :            459 :     worker->generation++;
 2579                           437                 :            459 :     worker->proc = NULL;
 2642                           438                 :            459 :     worker->dbid = dbid;
                                439                 :            459 :     worker->userid = userid;
                                440                 :            459 :     worker->subid = subid;
 2579                           441                 :            459 :     worker->relid = relid;
                                442                 :            459 :     worker->relstate = SUBREL_STATE_UNKNOWN;
                                443                 :            459 :     worker->relstate_lsn = InvalidXLogRecPtr;
  955 akapila@postgresql.o      444                 :            459 :     worker->stream_fileset = NULL;
  452                           445         [ +  + ]:            459 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
  461                           446                 :            459 :     worker->parallel_apply = is_parallel_apply_worker;
 2579 peter_e@gmx.net           447                 :            459 :     worker->last_lsn = InvalidXLogRecPtr;
                                448                 :            459 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
                                449                 :            459 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
                                450                 :            459 :     worker->reply_lsn = InvalidXLogRecPtr;
                                451                 :            459 :     TIMESTAMP_NOBEGIN(worker->reply_time);
                                452                 :                : 
                                453                 :                :     /* Before releasing lock, remember generation for future identification. */
 2400 tgl@sss.pgh.pa.us         454                 :            459 :     generation = worker->generation;
                                455                 :                : 
 2642 peter_e@gmx.net           456                 :            459 :     LWLockRelease(LogicalRepWorkerLock);
                                457                 :                : 
                                458                 :                :     /* Register the new dynamic worker. */
 2555 tgl@sss.pgh.pa.us         459                 :            459 :     memset(&bgw, 0, sizeof(bgw));
 2524 bruce@momjian.us          460                 :            459 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                461                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 2642 peter_e@gmx.net           462                 :            459 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
  286 nathan@postgresql.or      463                 :GNC         459 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
                                464                 :                : 
  236 akapila@postgresql.o      465   [ +  +  +  -  :            459 :     switch (worker->type)
                                                 - ]
                                466                 :                :     {
                                467                 :            276 :         case WORKERTYPE_APPLY:
                                468                 :            276 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
                                469                 :            276 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                470                 :                :                      "logical replication apply worker for subscription %u",
                                471                 :                :                      subid);
                                472                 :            276 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
                                473                 :            276 :             break;
                                474                 :                : 
                                475                 :             11 :         case WORKERTYPE_PARALLEL_APPLY:
                                476                 :             11 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
                                477                 :             11 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                478                 :                :                      "logical replication parallel apply worker for subscription %u",
                                479                 :                :                      subid);
                                480                 :             11 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
                                481                 :                : 
                                482                 :             11 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
                                483                 :             11 :             break;
                                484                 :                : 
                                485                 :            172 :         case WORKERTYPE_TABLESYNC:
                                486                 :            172 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
                                487                 :            172 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                488                 :                :                      "logical replication tablesync worker for subscription %u sync %u",
                                489                 :                :                      subid,
                                490                 :                :                      relid);
                                491                 :            172 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
                                492                 :            172 :             break;
                                493                 :                : 
  236 akapila@postgresql.o      494                 :UNC           0 :         case WORKERTYPE_UNKNOWN:
                                495                 :                :             /* Should never happen. */
                                496         [ #  # ]:              0 :             elog(ERROR, "unknown worker type");
                                497                 :                :     }
                                498                 :                : 
 2642 peter_e@gmx.net           499                 :CBC         459 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
                                500                 :            459 :     bgw.bgw_notify_pid = MyProcPid;
 2552 fujii@postgresql.org      501                 :            459 :     bgw.bgw_main_arg = Int32GetDatum(slot);
                                502                 :                : 
 2642 peter_e@gmx.net           503         [ +  + ]:            459 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
                                504                 :                :     {
                                505                 :                :         /* Failed to start worker, so clean up the worker slot. */
 2400 tgl@sss.pgh.pa.us         506                 :              4 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                507         [ -  + ]:              4 :         Assert(generation == worker->generation);
                                508                 :              4 :         logicalrep_worker_cleanup(worker);
                                509                 :              4 :         LWLockRelease(LogicalRepWorkerLock);
                                510                 :                : 
 2642 peter_e@gmx.net           511         [ +  - ]:              4 :         ereport(WARNING,
                                512                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                513                 :                :                  errmsg("out of background worker slots"),
                                514                 :                :                  errhint("You might need to increase %s.", "max_worker_processes")));
  461 akapila@postgresql.o      515                 :              4 :         return false;
                                516                 :                :     }
                                517                 :                : 
                                518                 :                :     /* Now wait until it attaches. */
                                519                 :            455 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
                                520                 :                : }
                                521                 :                : 
                                522                 :                : /*
                                523                 :                :  * Internal function to stop the worker and wait until it detaches from the
                                524                 :                :  * slot.
                                525                 :                :  */
                                526                 :                : static void
                                527                 :             60 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
                                528                 :                : {
                                529                 :                :     uint16      generation;
                                530                 :                : 
                                531         [ -  + ]:             60 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
                                532                 :                : 
                                533                 :                :     /*
                                534                 :                :      * Remember which generation was our worker so we can check if what we see
                                535                 :                :      * is still the same one.
                                536                 :                :      */
 2545 peter_e@gmx.net           537                 :             60 :     generation = worker->generation;
                                538                 :                : 
                                539                 :                :     /*
                                540                 :                :      * If we found a worker but it does not have proc set then it is still
                                541                 :                :      * starting up; wait for it to finish starting and then kill it.
                                542                 :                :      */
                                543   [ +  -  -  + ]:             60 :     while (worker->in_use && !worker->proc)
                                544                 :                :     {
                                545                 :                :         int         rc;
                                546                 :                : 
 2642 peter_e@gmx.net           547                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                548                 :                : 
                                549                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 2504 andres@anarazel.de        550                 :              0 :         rc = WaitLatch(MyLatch,
                                551                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                552                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                553                 :                : 
                                554         [ #  # ]:              0 :         if (rc & WL_LATCH_SET)
                                555                 :                :         {
                                556                 :              0 :             ResetLatch(MyLatch);
                                557         [ #  # ]:              0 :             CHECK_FOR_INTERRUPTS();
                                558                 :                :         }
                                559                 :                : 
                                560                 :                :         /* Recheck worker status. */
 2642 peter_e@gmx.net           561                 :              0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                562                 :                : 
                                563                 :                :         /*
                                564                 :                :          * Check whether the worker slot is no longer used, which would mean
                                565                 :                :          * that the worker has exited, or whether the worker generation is
                                566                 :                :          * different, meaning that a different worker has taken the slot.
                                567                 :                :          */
 2545                           568   [ #  #  #  # ]:              0 :         if (!worker->in_use || worker->generation != generation)
 2638                           569                 :              0 :             return;
                                570                 :                : 
                                571                 :                :         /* Worker has assigned proc, so it has started. */
                                572         [ #  # ]:              0 :         if (worker->proc)
 2642                           573                 :              0 :             break;
                                574                 :                :     }
                                575                 :                : 
                                576                 :                :     /* Now terminate the worker ... */
  461 akapila@postgresql.o      577                 :CBC          60 :     kill(worker->proc->pid, signo);
                                578                 :                : 
                                579                 :                :     /* ... and wait for it to die. */
                                580                 :                :     for (;;)
 2642 peter_e@gmx.net           581                 :             79 :     {
                                582                 :                :         int         rc;
                                583                 :                : 
                                584                 :                :         /* is it gone? */
 2545                           585   [ +  +  +  - ]:            139 :         if (!worker->proc || worker->generation != generation)
                                586                 :                :             break;
                                587                 :                : 
 2479 tgl@sss.pgh.pa.us         588                 :             79 :         LWLockRelease(LogicalRepWorkerLock);
                                589                 :                : 
                                590                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 2504 andres@anarazel.de        591                 :             79 :         rc = WaitLatch(MyLatch,
                                592                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                593                 :                :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
                                594                 :                : 
                                595         [ +  + ]:             79 :         if (rc & WL_LATCH_SET)
                                596                 :                :         {
                                597                 :             22 :             ResetLatch(MyLatch);
                                598         [ +  + ]:             22 :             CHECK_FOR_INTERRUPTS();
                                599                 :                :         }
                                600                 :                : 
 2479 tgl@sss.pgh.pa.us         601                 :             79 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                602                 :                :     }
                                603                 :                : }
                                604                 :                : 
                                605                 :                : /*
                                606                 :                :  * Stop the logical replication worker for subid/relid, if any.
                                607                 :                :  */
                                608                 :                : void
  461 akapila@postgresql.o      609                 :             71 : logicalrep_worker_stop(Oid subid, Oid relid)
                                610                 :                : {
                                611                 :                :     LogicalRepWorker *worker;
                                612                 :                : 
                                613                 :             71 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                614                 :                : 
                                615                 :             71 :     worker = logicalrep_worker_find(subid, relid, false);
                                616                 :                : 
                                617         [ +  + ]:             71 :     if (worker)
                                618                 :                :     {
                                619   [ +  -  -  + ]:             53 :         Assert(!isParallelApplyWorker(worker));
                                620                 :             53 :         logicalrep_worker_stop_internal(worker, SIGTERM);
                                621                 :                :     }
                                622                 :                : 
                                623                 :             71 :     LWLockRelease(LogicalRepWorkerLock);
                                624                 :             71 : }
                                625                 :                : 
                                626                 :                : /*
                                627                 :                :  * Stop the given logical replication parallel apply worker.
                                628                 :                :  *
                                629                 :                :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
                                630                 :                :  * worker so that the worker exits cleanly.
                                631                 :                :  */
                                632                 :                : void
  341                           633                 :              5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
                                634                 :                : {
                                635                 :                :     int         slot_no;
                                636                 :                :     uint16      generation;
                                637                 :                :     LogicalRepWorker *worker;
                                638                 :                : 
                                639         [ -  + ]:              5 :     SpinLockAcquire(&winfo->shared->mutex);
                                640                 :              5 :     generation = winfo->shared->logicalrep_worker_generation;
                                641                 :              5 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
                                642                 :              5 :     SpinLockRelease(&winfo->shared->mutex);
                                643                 :                : 
  461                           644   [ +  -  -  + ]:              5 :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
                                645                 :                : 
                                646                 :                :     /*
                                647                 :                :      * Detach from the error_mq_handle for the parallel apply worker before
                                648                 :                :      * stopping it. This prevents the leader apply worker from trying to
                                649                 :                :      * receive the message from the error queue that might already be detached
                                650                 :                :      * by the parallel apply worker.
                                651                 :                :      */
  341                           652         [ +  - ]:              5 :     if (winfo->error_mq_handle)
                                653                 :                :     {
                                654                 :              5 :         shm_mq_detach(winfo->error_mq_handle);
                                655                 :              5 :         winfo->error_mq_handle = NULL;
                                656                 :                :     }
                                657                 :                : 
  461                           658                 :              5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                659                 :                : 
                                660                 :              5 :     worker = &LogicalRepCtx->workers[slot_no];
                                661   [ +  -  -  + ]:              5 :     Assert(isParallelApplyWorker(worker));
                                662                 :                : 
                                663                 :                :     /*
                                664                 :                :      * Only stop the worker if the generation matches and the worker is alive.
                                665                 :                :      */
                                666   [ +  -  +  - ]:              5 :     if (worker->generation == generation && worker->proc)
                                667                 :              5 :         logicalrep_worker_stop_internal(worker, SIGINT);
                                668                 :                : 
 2479 tgl@sss.pgh.pa.us         669                 :              5 :     LWLockRelease(LogicalRepWorkerLock);
 2642 peter_e@gmx.net           670                 :              5 : }
                                671                 :                : 
                                672                 :                : /*
                                673                 :                :  * Wake up (using latch) any logical replication worker for specified sub/rel.
                                674                 :                :  */
                                675                 :                : void
 2579                           676                 :            191 : logicalrep_worker_wakeup(Oid subid, Oid relid)
                                677                 :                : {
                                678                 :                :     LogicalRepWorker *worker;
                                679                 :                : 
                                680                 :            191 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                681                 :                : 
                                682                 :            191 :     worker = logicalrep_worker_find(subid, relid, true);
                                683                 :                : 
                                684         [ +  - ]:            191 :     if (worker)
                                685                 :            191 :         logicalrep_worker_wakeup_ptr(worker);
                                686                 :                : 
 2480 tgl@sss.pgh.pa.us         687                 :            191 :     LWLockRelease(LogicalRepWorkerLock);
 2579 peter_e@gmx.net           688                 :            191 : }
                                689                 :                : 
                                690                 :                : /*
                                691                 :                :  * Wake up (using latch) the specified logical replication worker.
                                692                 :                :  *
                                693                 :                :  * Caller must hold lock, else worker->proc could change under us.
                                694                 :                :  */
                                695                 :                : void
                                696                 :            564 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
                                697                 :                : {
 2480 tgl@sss.pgh.pa.us         698         [ -  + ]:            564 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                699                 :                : 
 2579 peter_e@gmx.net           700                 :            564 :     SetLatch(&worker->proc->procLatch);
                                701                 :            564 : }
                                702                 :                : 
                                703                 :                : /*
                                704                 :                :  * Attach to a slot.
                                705                 :                :  */
                                706                 :                : void
 2642                           707                 :            452 : logicalrep_worker_attach(int slot)
                                708                 :                : {
                                709                 :                :     /* Block concurrent access. */
                                710                 :            452 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                711                 :                : 
                                712   [ +  -  -  + ]:            452 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
                                713                 :            452 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
                                714                 :                : 
 2545                           715         [ -  + ]:            452 :     if (!MyLogicalRepWorker->in_use)
                                716                 :                :     {
 2545 peter_e@gmx.net           717                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                718         [ #  # ]:              0 :         ereport(ERROR,
                                719                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                720                 :                :                  errmsg("logical replication worker slot %d is empty, cannot attach",
                                721                 :                :                         slot)));
                                722                 :                :     }
                                723                 :                : 
 2642 peter_e@gmx.net           724         [ -  + ]:CBC         452 :     if (MyLogicalRepWorker->proc)
                                725                 :                :     {
 2545 peter_e@gmx.net           726                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 2642                           727         [ #  # ]:              0 :         ereport(ERROR,
                                728                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                729                 :                :                  errmsg("logical replication worker slot %d is already used by "
                                730                 :                :                         "another worker, cannot attach", slot)));
                                731                 :                :     }
                                732                 :                : 
 2642 peter_e@gmx.net           733                 :CBC         452 :     MyLogicalRepWorker->proc = MyProc;
                                734                 :            452 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
                                735                 :                : 
                                736                 :            452 :     LWLockRelease(LogicalRepWorkerLock);
                                737                 :            452 : }
                                738                 :                : 
                                739                 :                : /*
                                740                 :                :  * Stop the parallel apply workers if any, and detach the leader apply worker
                                741                 :                :  * (cleans up the worker info).
                                742                 :                :  */
                                743                 :                : static void
                                744                 :            442 : logicalrep_worker_detach(void)
                                745                 :                : {
                                746                 :                :     /* Stop the parallel apply workers. */
  461 akapila@postgresql.o      747         [ +  + ]:            442 :     if (am_leader_apply_worker())
                                748                 :                :     {
                                749                 :                :         List       *workers;
                                750                 :                :         ListCell   *lc;
                                751                 :                : 
                                752                 :                :         /*
                                753                 :                :          * Detach from the error_mq_handle for all parallel apply workers
                                754                 :                :          * before terminating them. This prevents the leader apply worker from
                                755                 :                :          * receiving the worker termination message and sending it to logs
                                756                 :                :          * when the same is already done by the parallel worker.
                                757                 :                :          */
                                758                 :            263 :         pa_detach_all_error_mq();
                                759                 :                : 
                                760                 :            263 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                761                 :                : 
                                762                 :            263 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
                                763   [ +  -  +  +  :            529 :         foreach(lc, workers)
                                              +  + ]
                                764                 :                :         {
                                765                 :            266 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
                                766                 :                : 
                                767   [ +  -  +  + ]:            266 :             if (isParallelApplyWorker(w))
                                768                 :              2 :                 logicalrep_worker_stop_internal(w, SIGTERM);
                                769                 :                :         }
                                770                 :                : 
                                771                 :            263 :         LWLockRelease(LogicalRepWorkerLock);
                                772                 :                :     }
                                773                 :                : 
                                774                 :                :     /* Block concurrent access. */
 2642 peter_e@gmx.net           775                 :            442 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                776                 :                : 
 2545                           777                 :            442 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
                                778                 :                : 
 2642                           779                 :            442 :     LWLockRelease(LogicalRepWorkerLock);
                                780                 :            442 : }
                                781                 :                : 
                                782                 :                : /*
                                783                 :                :  * Clean up worker info.
                                784                 :                :  */
                                785                 :                : static void
 2545                           786                 :            446 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
                                787                 :                : {
                                788         [ -  + ]:            446 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
                                789                 :                : 
  233 akapila@postgresql.o      790                 :GNC         446 :     worker->type = WORKERTYPE_UNKNOWN;
 2545 peter_e@gmx.net           791                 :CBC         446 :     worker->in_use = false;
                                792                 :            446 :     worker->proc = NULL;
                                793                 :            446 :     worker->dbid = InvalidOid;
                                794                 :            446 :     worker->userid = InvalidOid;
                                795                 :            446 :     worker->subid = InvalidOid;
                                796                 :            446 :     worker->relid = InvalidOid;
  452 akapila@postgresql.o      797                 :            446 :     worker->leader_pid = InvalidPid;
  461                           798                 :            446 :     worker->parallel_apply = false;
 2545 peter_e@gmx.net           799                 :            446 : }
                                800                 :                : 
                                801                 :                : /*
                                802                 :                :  * Cleanup function for logical replication launcher.
                                803                 :                :  *
                                804                 :                :  * Called on logical replication launcher exit.
                                805                 :                :  */
                                806                 :                : static void
 2552 fujii@postgresql.org      807                 :            358 : logicalrep_launcher_onexit(int code, Datum arg)
                                808                 :                : {
                                809                 :            358 :     LogicalRepCtx->launcher_pid = 0;
                                810                 :            358 : }
                                811                 :                : 
                                812                 :                : /*
                                813                 :                :  * Cleanup function.
                                814                 :                :  *
                                815                 :                :  * Called on logical replication worker exit.
                                816                 :                :  */
                                817                 :                : static void
 2642 peter_e@gmx.net           818                 :            442 : logicalrep_worker_onexit(int code, Datum arg)
                                819                 :                : {
                                820                 :                :     /* Disconnect gracefully from the remote side. */
 1068 alvherre@alvh.no-ip.      821         [ +  + ]:            442 :     if (LogRepWorkerWalRcvConn)
                                822                 :            335 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
                                823                 :                : 
 2642 peter_e@gmx.net           824                 :            442 :     logicalrep_worker_detach();
                                825                 :                : 
                                826                 :                :     /* Cleanup fileset used for streaming transactions. */
  955 akapila@postgresql.o      827         [ +  + ]:            442 :     if (MyLogicalRepWorker->stream_fileset != NULL)
                                828                 :             14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
                                829                 :                : 
                                830                 :                :     /*
                                831                 :                :      * Session level locks may be acquired outside of a transaction in
                                832                 :                :      * parallel apply mode and will not be released when the worker
                                833                 :                :      * terminates, so manually release all locks before the worker exits.
                                834                 :                :      *
                                835                 :                :      * The locks will be acquired once the worker is initialized.
                                836                 :                :      */
  347                           837         [ +  + ]:            442 :     if (!InitializingApplyWorker)
                                838                 :            438 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
                                839                 :                : 
 2509 peter_e@gmx.net           840                 :            442 :     ApplyLauncherWakeup();
 2642                           841                 :            442 : }
                                842                 :                : 
                                843                 :                : /*
                                844                 :                :  * Count the number of registered (not necessarily running) sync workers
                                845                 :                :  * for a subscription.
                                846                 :                :  */
                                847                 :                : int
 2579                           848                 :           1144 : logicalrep_sync_worker_count(Oid subid)
                                849                 :                : {
                                850                 :                :     int         i;
 2524 bruce@momjian.us          851                 :           1144 :     int         res = 0;
                                852                 :                : 
 2579 peter_e@gmx.net           853         [ -  + ]:           1144 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                854                 :                : 
                                855                 :                :     /* Search for attached worker for a given subscription id. */
                                856         [ +  + ]:           6044 :     for (i = 0; i < max_logical_replication_workers; i++)
                                857                 :                :     {
 2524 bruce@momjian.us          858                 :           4900 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                859                 :                : 
  233 akapila@postgresql.o      860   [ +  +  +  +  :GNC        4900 :         if (isTablesyncWorker(w) && w->subid == subid)
                                              +  + ]
 2579 peter_e@gmx.net           861                 :CBC        1109 :             res++;
                                862                 :                :     }
                                863                 :                : 
                                864                 :           1144 :     return res;
                                865                 :                : }
                                866                 :                : 
                                867                 :                : /*
                                868                 :                :  * Count the number of registered (but not necessarily running) parallel apply
                                869                 :                :  * workers for a subscription.
                                870                 :                :  */
                                871                 :                : static int
  461 akapila@postgresql.o      872                 :            459 : logicalrep_pa_worker_count(Oid subid)
                                873                 :                : {
                                874                 :                :     int         i;
                                875                 :            459 :     int         res = 0;
                                876                 :                : 
                                877         [ -  + ]:            459 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                878                 :                : 
                                879                 :                :     /*
                                880                 :                :      * Scan all attached parallel apply workers, only counting those which
                                881                 :                :      * have the given subscription id.
                                882                 :                :      */
                                883         [ +  + ]:           2537 :     for (i = 0; i < max_logical_replication_workers; i++)
                                884                 :                :     {
                                885                 :           2078 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                886                 :                : 
  233 akapila@postgresql.o      887   [ +  +  +  +  :GNC        2078 :         if (isParallelApplyWorker(w) && w->subid == subid)
                                              +  - ]
  461 akapila@postgresql.o      888                 :CBC           2 :             res++;
                                889                 :                :     }
                                890                 :                : 
                                891                 :            459 :     return res;
                                892                 :                : }
                                893                 :                : 
                                894                 :                : /*
                                895                 :                :  * ApplyLauncherShmemSize
                                896                 :                :  *      Compute space needed for replication launcher shared memory
                                897                 :                :  */
                                898                 :                : Size
 2642 peter_e@gmx.net           899                 :           3475 : ApplyLauncherShmemSize(void)
                                900                 :                : {
                                901                 :                :     Size        size;
                                902                 :                : 
                                903                 :                :     /*
                                904                 :                :      * Need the fixed struct and the array of LogicalRepWorker.
                                905                 :                :      */
                                906                 :           3475 :     size = sizeof(LogicalRepCtxStruct);
                                907                 :           3475 :     size = MAXALIGN(size);
                                908                 :           3475 :     size = add_size(size, mul_size(max_logical_replication_workers,
                                909                 :                :                                    sizeof(LogicalRepWorker)));
                                910                 :           3475 :     return size;
                                911                 :                : }
                                912                 :                : 
                                913                 :                : /*
                                914                 :                :  * ApplyLauncherRegister
                                915                 :                :  *      Register a background worker running the logical replication launcher.
                                916                 :                :  */
                                917                 :                : void
                                918                 :            733 : ApplyLauncherRegister(void)
                                919                 :                : {
                                920                 :                :     BackgroundWorker bgw;
                                921                 :                : 
                                922                 :                :     /*
                                923                 :                :      * The logical replication launcher is disabled during binary upgrades, to
                                924                 :                :      * prevent logical replication workers from running on the source cluster.
                                925                 :                :      * That could cause replication origins to move forward after having been
                                926                 :                :      * copied to the target cluster, potentially creating conflicts with the
                                927                 :                :      * copied data files.
                                928                 :                :      */
   93 michael@paquier.xyz       929   [ +  +  +  + ]:GNC         733 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
 2642 peter_e@gmx.net           930                 :GBC          24 :         return;
                                931                 :                : 
 2555 tgl@sss.pgh.pa.us         932                 :CBC         709 :     memset(&bgw, 0, sizeof(bgw));
 2524 bruce@momjian.us          933                 :            709 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                934                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 2642 peter_e@gmx.net           935                 :            709 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
  286 nathan@postgresql.or      936                 :GNC         709 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 2571 rhaas@postgresql.org      937                 :CBC         709 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 2642 peter_e@gmx.net           938                 :            709 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
                                939                 :                :              "logical replication launcher");
 2418                           940                 :            709 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
                                941                 :                :              "logical replication launcher");
 2642                           942                 :            709 :     bgw.bgw_restart_time = 5;
                                943                 :            709 :     bgw.bgw_notify_pid = 0;
                                944                 :            709 :     bgw.bgw_main_arg = (Datum) 0;
                                945                 :                : 
                                946                 :            709 :     RegisterBackgroundWorker(&bgw);
                                947                 :                : }
                                948                 :                : 
                                949                 :                : /*
                                950                 :                :  * ApplyLauncherShmemInit
                                951                 :                :  *      Allocate and initialize replication launcher shared memory
                                952                 :                :  */
                                953                 :                : void
                                954                 :            898 : ApplyLauncherShmemInit(void)
                                955                 :                : {
                                956                 :                :     bool        found;
                                957                 :                : 
                                958                 :            898 :     LogicalRepCtx = (LogicalRepCtxStruct *)
                                959                 :            898 :         ShmemInitStruct("Logical Replication Launcher Data",
                                960                 :                :                         ApplyLauncherShmemSize(),
                                961                 :                :                         &found);
                                962                 :                : 
                                963         [ +  - ]:            898 :     if (!found)
                                964                 :                :     {
                                965                 :                :         int         slot;
                                966                 :                : 
                                967                 :            898 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
                                968                 :                : 
  445 tgl@sss.pgh.pa.us         969                 :            898 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
                                970                 :            898 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
                                971                 :                : 
                                972                 :                :         /* Initialize memory and spin locks for each worker slot. */
 2579 peter_e@gmx.net           973         [ +  + ]:           4473 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
                                974                 :                :         {
                                975                 :           3575 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
                                976                 :                : 
                                977                 :           3575 :             memset(worker, 0, sizeof(LogicalRepWorker));
                                978                 :           3575 :             SpinLockInit(&worker->relmutex);
                                979                 :                :         }
                                980                 :                :     }
 2642                           981                 :            898 : }
                                982                 :                : 
                                983                 :                : /*
                                984                 :                :  * Initialize or attach to the dynamic shared hash table that stores the
                                985                 :                :  * last-start times, if not already done.
                                986                 :                :  * This must be called before accessing the table.
                                987                 :                :  */
                                988                 :                : static void
  448 tgl@sss.pgh.pa.us         989                 :           1041 : logicalrep_launcher_attach_dshmem(void)
                                990                 :                : {
                                991                 :                :     MemoryContext oldcontext;
                                992                 :                : 
                                993                 :                :     /* Quick exit if we already did this. */
  445                           994         [ +  + ]:           1041 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
  448                           995         [ +  + ]:            977 :         last_start_times != NULL)
                                996                 :            878 :         return;
                                997                 :                : 
                                998                 :                :     /* Otherwise, use a lock to ensure only one process creates the table. */
                                999                 :            163 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                               1000                 :                : 
                               1001                 :                :     /* Be sure any local memory allocated by DSA routines is persistent. */
                               1002                 :            163 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                               1003                 :                : 
  445                          1004         [ +  + ]:            163 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
                               1005                 :                :     {
                               1006                 :                :         /* Initialize dynamic shared hash table for last-start times. */
  448                          1007                 :             64 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
                               1008                 :             64 :         dsa_pin(last_start_times_dsa);
                               1009                 :             64 :         dsa_pin_mapping(last_start_times_dsa);
   48 nathan@postgresql.or     1010                 :GNC          64 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
                               1011                 :                : 
                               1012                 :                :         /* Store handles in shared memory for other backends to use. */
  448 tgl@sss.pgh.pa.us        1013                 :CBC          64 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
                               1014                 :             64 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
                               1015                 :                :     }
                               1016         [ +  - ]:             99 :     else if (!last_start_times)
                               1017                 :                :     {
                               1018                 :                :         /* Attach to existing dynamic shared hash table. */
                               1019                 :             99 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
                               1020                 :             99 :         dsa_pin_mapping(last_start_times_dsa);
                               1021                 :             99 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
                               1022                 :             99 :                                          LogicalRepCtx->last_start_dsh, 0);
                               1023                 :                :     }
                               1024                 :                : 
                               1025                 :            163 :     MemoryContextSwitchTo(oldcontext);
                               1026                 :            163 :     LWLockRelease(LogicalRepWorkerLock);
                               1027                 :                : }
                               1028                 :                : 
                               1029                 :                : /*
                               1030                 :                :  * Set the last-start time for the subscription.
                               1031                 :                :  */
                               1032                 :                : static void
                               1033                 :            276 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
                               1034                 :                : {
                               1035                 :                :     LauncherLastStartTimesEntry *entry;
                               1036                 :                :     bool        found;
                               1037                 :                : 
                               1038                 :            276 :     logicalrep_launcher_attach_dshmem();
                               1039                 :                : 
                               1040                 :            276 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
                               1041                 :            276 :     entry->last_start_time = start_time;
                               1042                 :            276 :     dshash_release_lock(last_start_times, entry);
                               1043                 :            276 : }
                               1044                 :                : 
                               1045                 :                : /*
                               1046                 :                :  * Return the last-start time for the subscription, or 0 if there isn't one.
                               1047                 :                :  */
                               1048                 :                : static TimestampTz
                               1049                 :            637 : ApplyLauncherGetWorkerStartTime(Oid subid)
                               1050                 :                : {
                               1051                 :                :     LauncherLastStartTimesEntry *entry;
                               1052                 :                :     TimestampTz ret;
                               1053                 :                : 
                               1054                 :            637 :     logicalrep_launcher_attach_dshmem();
                               1055                 :                : 
                               1056                 :            637 :     entry = dshash_find(last_start_times, &subid, false);
                               1057         [ +  + ]:            637 :     if (entry == NULL)
                               1058                 :            149 :         return 0;
                               1059                 :                : 
                               1060                 :            488 :     ret = entry->last_start_time;
                               1061                 :            488 :     dshash_release_lock(last_start_times, entry);
                               1062                 :                : 
                               1063                 :            488 :     return ret;
                               1064                 :                : }
                               1065                 :                : 
                               1066                 :                : /*
                               1067                 :                :  * Remove the last-start-time entry for the subscription, if one exists.
                               1068                 :                :  *
                               1069                 :                :  * This has two use-cases: to remove the entry related to a subscription
                               1070                 :                :  * that's been deleted or disabled (just to avoid leaking shared memory),
                               1071                 :                :  * and to allow immediate restart of an apply worker that has exited
                               1072                 :                :  * due to subscription parameter changes.
                               1073                 :                :  */
                               1074                 :                : void
                               1075                 :            128 : ApplyLauncherForgetWorkerStartTime(Oid subid)
                               1076                 :                : {
                               1077                 :            128 :     logicalrep_launcher_attach_dshmem();
                               1078                 :                : 
                               1079                 :            128 :     (void) dshash_delete_key(last_start_times, &subid);
                               1080                 :            128 : }
                               1081                 :                : 
                               1082                 :                : /*
                               1083                 :                :  * Wakeup the launcher on commit if requested.
                               1084                 :                :  */
                               1085                 :                : void
 2540 peter_e@gmx.net          1086                 :         432906 : AtEOXact_ApplyLauncher(bool isCommit)
                               1087                 :                : {
 2445                          1088         [ +  + ]:         432906 :     if (isCommit)
                               1089                 :                :     {
                               1090         [ +  + ]:         409700 :         if (on_commit_launcher_wakeup)
                               1091                 :            116 :             ApplyLauncherWakeup();
                               1092                 :                :     }
                               1093                 :                : 
 2540                          1094                 :         432906 :     on_commit_launcher_wakeup = false;
 2642                          1095                 :         432906 : }
                               1096                 :                : 
                               1097                 :                : /*
                               1098                 :                :  * Request wakeup of the launcher on commit of the transaction.
                               1099                 :                :  *
                               1100                 :                :  * This is used to send launcher signal to stop sleeping and process the
                               1101                 :                :  * subscriptions when current transaction commits. Should be used when new
                               1102                 :                :  * tuple was added to the pg_subscription catalog.
                               1103                 :                : */
                               1104                 :                : void
                               1105                 :            116 : ApplyLauncherWakeupAtCommit(void)
                               1106                 :                : {
 2624 heikki.linnakangas@i     1107         [ +  - ]:            116 :     if (!on_commit_launcher_wakeup)
                               1108                 :            116 :         on_commit_launcher_wakeup = true;
 2642 peter_e@gmx.net          1109                 :            116 : }
                               1110                 :                : 
                               1111                 :                : static void
                               1112                 :            558 : ApplyLauncherWakeup(void)
                               1113                 :                : {
 2552 fujii@postgresql.org     1114         [ +  + ]:            558 :     if (LogicalRepCtx->launcher_pid != 0)
 2642 peter_e@gmx.net          1115                 :            537 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
                               1116                 :            558 : }
                               1117                 :                : 
                               1118                 :                : /*
                               1119                 :                :  * Main loop for the apply launcher process.
                               1120                 :                :  */
                               1121                 :                : void
                               1122                 :            618 : ApplyLauncherMain(Datum main_arg)
                               1123                 :                : {
 2592 tgl@sss.pgh.pa.us        1124         [ +  + ]:            618 :     ereport(DEBUG1,
                               1125                 :                :             (errmsg_internal("logical replication launcher started")));
                               1126                 :                : 
 2552 fujii@postgresql.org     1127                 :            618 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
                               1128                 :                : 
 2502 andres@anarazel.de       1129         [ -  + ]:            618 :     Assert(LogicalRepCtx->launcher_pid == 0);
                               1130                 :            618 :     LogicalRepCtx->launcher_pid = MyProcPid;
                               1131                 :                : 
                               1132                 :                :     /* Establish signal handlers. */
 1367 akapila@postgresql.o     1133                 :            618 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
 2502 andres@anarazel.de       1134                 :            618 :     pqsignal(SIGTERM, die);
 2642 peter_e@gmx.net          1135                 :            618 :     BackgroundWorkerUnblockSignals();
                               1136                 :                : 
                               1137                 :                :     /*
                               1138                 :                :      * Establish connection to nailed catalogs (we only ever access
                               1139                 :                :      * pg_subscription).
                               1140                 :                :      */
 2201 magnus@hagander.net      1141                 :            618 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
                               1142                 :                : 
                               1143                 :                :     /* Enter main loop */
                               1144                 :                :     for (;;)
 2642 peter_e@gmx.net          1145                 :           2507 :     {
                               1146                 :                :         int         rc;
                               1147                 :                :         List       *sublist;
                               1148                 :                :         ListCell   *lc;
                               1149                 :                :         MemoryContext subctx;
                               1150                 :                :         MemoryContext oldctx;
 2524 bruce@momjian.us         1151                 :           3125 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
                               1152                 :                : 
 2502 andres@anarazel.de       1153         [ +  + ]:           3125 :         CHECK_FOR_INTERRUPTS();
                               1154                 :                : 
                               1155                 :                :         /* Use temporary context to avoid leaking memory across cycles. */
  448 tgl@sss.pgh.pa.us        1156                 :           3124 :         subctx = AllocSetContextCreate(TopMemoryContext,
                               1157                 :                :                                        "Logical Replication Launcher sublist",
                               1158                 :                :                                        ALLOCSET_DEFAULT_SIZES);
                               1159                 :           3124 :         oldctx = MemoryContextSwitchTo(subctx);
                               1160                 :                : 
                               1161                 :                :         /* Start any missing workers for enabled subscriptions. */
                               1162                 :           3124 :         sublist = get_subscription_list();
                               1163   [ +  +  +  +  :           4136 :         foreach(lc, sublist)
                                              +  + ]
                               1164                 :                :         {
                               1165                 :           1014 :             Subscription *sub = (Subscription *) lfirst(lc);
                               1166                 :                :             LogicalRepWorker *w;
                               1167                 :                :             TimestampTz last_start;
                               1168                 :                :             TimestampTz now;
                               1169                 :                :             long        elapsed;
                               1170                 :                : 
                               1171         [ +  + ]:           1014 :             if (!sub->enabled)
                               1172                 :             31 :                 continue;
                               1173                 :                : 
                               1174                 :            983 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1175                 :            983 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
                               1176                 :            983 :             LWLockRelease(LogicalRepWorkerLock);
                               1177                 :                : 
                               1178         [ +  + ]:            983 :             if (w != NULL)
                               1179                 :            346 :                 continue;       /* worker is running already */
                               1180                 :                : 
                               1181                 :                :             /*
                               1182                 :                :              * If the worker is eligible to start now, launch it.  Otherwise,
                               1183                 :                :              * adjust wait_time so that we'll wake up as soon as it can be
                               1184                 :                :              * started.
                               1185                 :                :              *
                               1186                 :                :              * Each subscription's apply worker can only be restarted once per
                               1187                 :                :              * wal_retrieve_retry_interval, so that errors do not cause us to
                               1188                 :                :              * repeatedly restart the worker as fast as possible.  In cases
                               1189                 :                :              * where a restart is expected (e.g., subscription parameter
                               1190                 :                :              * changes), another process should remove the last-start entry
                               1191                 :                :              * for the subscription so that the worker can be restarted
                               1192                 :                :              * without waiting for wal_retrieve_retry_interval to elapse.
                               1193                 :                :              */
                               1194                 :            637 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
                               1195                 :            637 :             now = GetCurrentTimestamp();
                               1196         [ +  + ]:            637 :             if (last_start == 0 ||
                               1197         [ +  + ]:            488 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
                               1198                 :                :             {
                               1199                 :            276 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
  244 akapila@postgresql.o     1200                 :GNC         276 :                 logicalrep_worker_launch(WORKERTYPE_APPLY,
                               1201                 :            276 :                                          sub->dbid, sub->oid, sub->name,
                               1202                 :                :                                          sub->owner, InvalidOid,
                               1203                 :                :                                          DSM_HANDLE_INVALID);
                               1204                 :                :             }
                               1205                 :                :             else
                               1206                 :                :             {
  448 tgl@sss.pgh.pa.us        1207                 :CBC         361 :                 wait_time = Min(wait_time,
                               1208                 :                :                                 wal_retrieve_retry_interval - elapsed);
                               1209                 :                :             }
                               1210                 :                :         }
                               1211                 :                : 
                               1212                 :                :         /* Switch back to original memory context. */
                               1213                 :           3122 :         MemoryContextSwitchTo(oldctx);
                               1214                 :                :         /* Clean the temporary memory. */
                               1215                 :           3122 :         MemoryContextDelete(subctx);
                               1216                 :                : 
                               1217                 :                :         /* Wait for more work. */
 2504 andres@anarazel.de       1218                 :           3122 :         rc = WaitLatch(MyLatch,
                               1219                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1220                 :                :                        wait_time,
                               1221                 :                :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
                               1222                 :                : 
                               1223         [ +  + ]:           2861 :         if (rc & WL_LATCH_SET)
                               1224                 :                :         {
                               1225                 :           2774 :             ResetLatch(MyLatch);
                               1226         [ +  + ]:           2774 :             CHECK_FOR_INTERRUPTS();
                               1227                 :                :         }
                               1228                 :                : 
 1580 rhaas@postgresql.org     1229         [ +  + ]:           2507 :         if (ConfigReloadPending)
                               1230                 :                :         {
                               1231                 :            100 :             ConfigReloadPending = false;
 2561 peter_e@gmx.net          1232                 :            100 :             ProcessConfigFile(PGC_SIGHUP);
                               1233                 :                :         }
                               1234                 :                :     }
                               1235                 :                : 
                               1236                 :                :     /* Not reachable */
                               1237                 :                : }
                               1238                 :                : 
                               1239                 :                : /*
                               1240                 :                :  * Is current process the logical replication launcher?
                               1241                 :                :  */
                               1242                 :                : bool
 2502 andres@anarazel.de       1243                 :            370 : IsLogicalLauncher(void)
                               1244                 :                : {
                               1245                 :            370 :     return LogicalRepCtx->launcher_pid == MyProcPid;
                               1246                 :                : }
                               1247                 :                : 
                               1248                 :                : /*
                               1249                 :                :  * Return the pid of the leader apply worker if the given pid is the pid of a
                               1250                 :                :  * parallel apply worker, otherwise, return InvalidPid.
                               1251                 :                :  */
                               1252                 :                : pid_t
  452 akapila@postgresql.o     1253                 :            794 : GetLeaderApplyWorkerPid(pid_t pid)
                               1254                 :                : {
                               1255                 :            794 :     int         leader_pid = InvalidPid;
                               1256                 :                :     int         i;
                               1257                 :                : 
                               1258                 :            794 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1259                 :                : 
                               1260         [ +  + ]:           3970 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1261                 :                :     {
                               1262                 :           3176 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                               1263                 :                : 
                               1264   [ +  +  -  +  :           3176 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
                                        -  -  -  - ]
                               1265                 :                :         {
  452 akapila@postgresql.o     1266                 :UBC           0 :             leader_pid = w->leader_pid;
                               1267                 :              0 :             break;
                               1268                 :                :         }
                               1269                 :                :     }
                               1270                 :                : 
  452 akapila@postgresql.o     1271                 :CBC         794 :     LWLockRelease(LogicalRepWorkerLock);
                               1272                 :                : 
                               1273                 :            794 :     return leader_pid;
                               1274                 :                : }
                               1275                 :                : 
                               1276                 :                : /*
                               1277                 :                :  * Returns state of the subscriptions.
                               1278                 :                :  */
                               1279                 :                : Datum
 2642 peter_e@gmx.net          1280                 :              1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
                               1281                 :                : {
                               1282                 :                : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
                               1283         [ -  + ]:              1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
                               1284                 :                :     int         i;
                               1285                 :              1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1286                 :                : 
  544 michael@paquier.xyz      1287                 :              1 :     InitMaterializedSRF(fcinfo, 0);
                               1288                 :                : 
                               1289                 :                :     /* Make sure we get consistent view of the workers. */
 2642 peter_e@gmx.net          1290                 :              1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1291                 :                : 
  677 tgl@sss.pgh.pa.us        1292         [ +  + ]:              5 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1293                 :                :     {
                               1294                 :                :         /* for each row */
  638 peter@eisentraut.org     1295                 :              4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1296                 :              4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1297                 :                :         int         worker_pid;
                               1298                 :                :         LogicalRepWorker worker;
                               1299                 :                : 
 2642 peter_e@gmx.net          1300                 :              4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
                               1301                 :                :                sizeof(LogicalRepWorker));
                               1302   [ +  +  -  + ]:              4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
                               1303                 :              2 :             continue;
                               1304                 :                : 
                               1305   [ -  +  -  - ]:              2 :         if (OidIsValid(subid) && worker.subid != subid)
 2642 peter_e@gmx.net          1306                 :UBC           0 :             continue;
                               1307                 :                : 
 2642 peter_e@gmx.net          1308                 :CBC           2 :         worker_pid = worker.proc->pid;
                               1309                 :                : 
                               1310                 :              2 :         values[0] = ObjectIdGetDatum(worker.subid);
  244 akapila@postgresql.o     1311   [ +  -  -  + ]:GNC           2 :         if (isTablesyncWorker(&worker))
 2579 peter_e@gmx.net          1312                 :UBC           0 :             values[1] = ObjectIdGetDatum(worker.relid);
                               1313                 :                :         else
 2579 peter_e@gmx.net          1314                 :CBC           2 :             nulls[1] = true;
                               1315                 :              2 :         values[2] = Int32GetDatum(worker_pid);
                               1316                 :                : 
  452 akapila@postgresql.o     1317   [ +  -  -  + ]:              2 :         if (isParallelApplyWorker(&worker))
  452 akapila@postgresql.o     1318                 :UBC           0 :             values[3] = Int32GetDatum(worker.leader_pid);
                               1319                 :                :         else
 2579 peter_e@gmx.net          1320                 :CBC           2 :             nulls[3] = true;
                               1321                 :                : 
  452 akapila@postgresql.o     1322         [ -  + ]:              2 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
  452 akapila@postgresql.o     1323                 :UBC           0 :             nulls[4] = true;
                               1324                 :                :         else
  452 akapila@postgresql.o     1325                 :CBC           2 :             values[4] = LSNGetDatum(worker.last_lsn);
 2642 peter_e@gmx.net          1326         [ -  + ]:              2 :         if (worker.last_send_time == 0)
  452 akapila@postgresql.o     1327                 :UBC           0 :             nulls[5] = true;
                               1328                 :                :         else
  452 akapila@postgresql.o     1329                 :CBC           2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
 2642 peter_e@gmx.net          1330         [ -  + ]:              2 :         if (worker.last_recv_time == 0)
  452 akapila@postgresql.o     1331                 :UBC           0 :             nulls[6] = true;
                               1332                 :                :         else
  452 akapila@postgresql.o     1333                 :CBC           2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
 2642 peter_e@gmx.net          1334         [ -  + ]:              2 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
  452 akapila@postgresql.o     1335                 :UBC           0 :             nulls[7] = true;
                               1336                 :                :         else
  452 akapila@postgresql.o     1337                 :CBC           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
 2642 peter_e@gmx.net          1338         [ -  + ]:              2 :         if (worker.reply_time == 0)
  452 akapila@postgresql.o     1339                 :UBC           0 :             nulls[8] = true;
                               1340                 :                :         else
  452 akapila@postgresql.o     1341                 :CBC           2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
                               1342                 :                : 
  202 nathan@postgresql.or     1343   [ +  -  -  -  :GNC           2 :         switch (worker.type)
                                                 - ]
                               1344                 :                :         {
                               1345                 :              2 :             case WORKERTYPE_APPLY:
                               1346                 :              2 :                 values[9] = CStringGetTextDatum("apply");
                               1347                 :              2 :                 break;
  202 nathan@postgresql.or     1348                 :UNC           0 :             case WORKERTYPE_PARALLEL_APPLY:
                               1349                 :              0 :                 values[9] = CStringGetTextDatum("parallel apply");
                               1350                 :              0 :                 break;
                               1351                 :              0 :             case WORKERTYPE_TABLESYNC:
                               1352                 :              0 :                 values[9] = CStringGetTextDatum("table synchronization");
                               1353                 :              0 :                 break;
                               1354                 :              0 :             case WORKERTYPE_UNKNOWN:
                               1355                 :                :                 /* Should never happen. */
                               1356         [ #  # ]:              0 :                 elog(ERROR, "unknown worker type");
                               1357                 :                :         }
                               1358                 :                : 
  769 michael@paquier.xyz      1359                 :CBC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                               1360                 :                :                              values, nulls);
                               1361                 :                : 
                               1362                 :                :         /*
                               1363                 :                :          * If only a single subscription was requested, and we found it,
                               1364                 :                :          * break.
                               1365                 :                :          */
 2642 peter_e@gmx.net          1366         [ -  + ]:              2 :         if (OidIsValid(subid))
 2642 peter_e@gmx.net          1367                 :UBC           0 :             break;
                               1368                 :                :     }
                               1369                 :                : 
 2642 peter_e@gmx.net          1370                 :CBC           1 :     LWLockRelease(LogicalRepWorkerLock);
                               1371                 :                : 
                               1372                 :              1 :     return (Datum) 0;
                               1373                 :                : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622